# Setting Up PySpark


In [1]:
!apt-get install openjdk-8-jdk-headless -qq > /dev/null
!wget -q http://apache.osuosl.org/spark/spark-3.1.1/spark-3.1.1-bin-hadoop2.7.tgz
!tar xf spark-3.1.1-bin-hadoop2.7.tgz
!pip install -q findspark

In [2]:
import os
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-8-openjdk-amd64"
os.environ["SPARK_HOME"] = "/content/spark-3.1.1-bin-hadoop2.7"

In [3]:
import findspark
import string
findspark.init()
from pyspark.sql import SparkSession
from pyspark import SparkConf
from pyspark import SparkContext
from pyspark.streaming import StreamingContext
from random import randint
import time
from pyspark.sql import SQLContext
from pyspark.sql import functions as F
spark = SparkSession.builder.appName("Hackathon2021").getOrCreate()

sc = spark.sparkContext

sqlContext = SQLContext(sc)

# Kaggle: USA Charities Data

In [4]:
# directoryPath to where the csv files for USA Charities Data is 
directoryPath = '/content/drive/MyDrive/HackAThon2021/Data/Kaggle USA Charities Data/*.csv'

# We now have a dataframe df that has all of the entries from all 50 states.
df = sqlContext.read.option("header", "true").load('/content/drive/MyDrive/HackAThon2021/Data/Kaggle USA Charities Data/*.csv', 'csv')

In [7]:
# Count how many entries we have.
numEntries = df.count()
print(numEntries)

1372473


In [11]:
# Look at the Schema to look at what fields we are working with
df.printSchema()

root
 |-- Name: string (nullable = true)
 |-- EIN: string (nullable = true)
 |-- Category: string (nullable = true)
 |-- Telephone: string (nullable = true)
 |-- Address: string (nullable = true)
 |-- City: string (nullable = true)
 |-- State: string (nullable = true)
 |-- Website: string (nullable = true)
 |-- Facebook: string (nullable = true)
 |-- Twitter: string (nullable = true)
 |-- Income: string (nullable = true)
 |-- Assests: string (nullable = true)



In [12]:
# How many Categories do we have 
numCat = df.groupby('Category').count().count()

print(numCat)

288


In [9]:
df.show()

+--------------------+----------+--------------------+---------+--------------------+---------------+----------+--------------------+--------+-------+-----------+-----------+
|                Name|       EIN|            Category|Telephone|             Address|           City|     State|             Website|Facebook|Twitter|     Income|    Assests|
+--------------------+----------+--------------------+---------+--------------------+---------------+----------+--------------------+--------+-------+-----------+-----------+
|1 grain to 1000 g...|46-3802710|Food, Agriculture...|      N/A| 2120 AVY AVENUE ...|     menlo park|California|                null|    null|   null|      $0.00|      $0.00|
|   1 giant mind inc |45-3053826|Science and Techn...|      N/A| 13428 MAXELLA AV...|nmarina del rey|California|http://1giantmind...|    null|   null|  $4,000.00| $14,154.00|
|    1 ghana project |82-3648374|Arts, Culture and...|      N/A| 15413 BOLERO DR ...|        fontana|California|             

In [None]:
df.groupby('Telephone').count().show()
# Looks like the telephone sectino of this table is useless.

+---------+-------+
|Telephone|  count|
+---------+-------+
|     null|    738|
|      N/A|1371735|
+---------+-------+

None


In [5]:
# Drop the Telephone column
df = df.drop("Telephone")
df.printSchema()

root
 |-- Name: string (nullable = true)
 |-- EIN: string (nullable = true)
 |-- Category: string (nullable = true)
 |-- Address: string (nullable = true)
 |-- City: string (nullable = true)
 |-- State: string (nullable = true)
 |-- Website: string (nullable = true)
 |-- Facebook: string (nullable = true)
 |-- Twitter: string (nullable = true)
 |-- Income: string (nullable = true)
 |-- Assests: string (nullable = true)



# Charity Navigator Scores Expenses Dataset

In [6]:
# Load the csv into a dataframe

df2 = sqlContext.read.option("header", "true").load('/content/drive/MyDrive/HackAThon2021/Data/Charity Navigator Scores Expenses Dataset/CLEAN_charity_data.csv', 'csv')

In [None]:
print(df2.count())

8710


In [None]:
df2.groupBy("state").count().show()

+--------------------+-----+
|               state|count|
+--------------------+-----+
|              0.0325|    1|
|               65.32|    1|
|  David L. Kutchback|    1|
|           1240262.0|    1|
| 0.06860000000000001|    1|
|              0.0359|    1|
|Animal Rights, We...|    1|
|               0.147|    1|
|                0.07|    1|
|          31205313.0|    1|
|               92.14|    1|
|            237474.0|    1|
|                86.1|    1|
|                  AZ|  115|
|                  SC|   74|
|Saving the lives ...|    1|
|Diseases, Disorde...|    1|
|              0.0095|    1|
|           2544826.0|    1|
|Community Foundat...|    1|
+--------------------+-----+
only showing top 20 rows



In [None]:
# Look at the schema so we know what fields we're working with

df2.printSchema()

root
 |-- ascore: string (nullable = true)
 |-- category: string (nullable = true)
 |-- description: string (nullable = true)
 |-- ein: string (nullable = true)
 |-- tot_exp: string (nullable = true)
 |-- admin_exp_p: string (nullable = true)
 |-- fund_eff: string (nullable = true)
 |-- fund_exp_p: string (nullable = true)
 |-- program_exp_p: string (nullable = true)
 |-- fscore: string (nullable = true)
 |-- leader: string (nullable = true)
 |-- leader_comp: string (nullable = true)
 |-- leader_comp_p: string (nullable = true)
 |-- motto: string (nullable = true)
 |-- name: string (nullable = true)
 |-- tot_rev: string (nullable = true)
 |-- score: string (nullable = true)
 |-- state: string (nullable = true)
 |-- subcategory: string (nullable = true)
 |-- size: string (nullable = true)
 |-- program_exp: string (nullable = true)
 |-- fund_exp: string (nullable = true)
 |-- admin_exp: string (nullable = true)



# EIN for both data sets

In [8]:
# We will remove any hyphens in the EIN's so we can join on them

from pyspark.sql.functions import regexp_replace

new_df = df.withColumn('EIN', regexp_replace("EIN", "[^0-9]", ""))



In [15]:
new_df.show()
print(new_df.count())

+--------------------+---------+--------------------+---------+--------------------+---------------+----------+--------------------+--------+-------+-----------+-----------+
|                Name|      EIN|            Category|Telephone|             Address|           City|     State|             Website|Facebook|Twitter|     Income|    Assests|
+--------------------+---------+--------------------+---------+--------------------+---------------+----------+--------------------+--------+-------+-----------+-----------+
|1 grain to 1000 g...|463802710|Food, Agriculture...|      N/A| 2120 AVY AVENUE ...|     menlo park|California|                null|    null|   null|      $0.00|      $0.00|
|   1 giant mind inc |453053826|Science and Techn...|      N/A| 13428 MAXELLA AV...|nmarina del rey|California|http://1giantmind...|    null|   null|  $4,000.00| $14,154.00|
|    1 ghana project |823648374|Arts, Culture and...|      N/A| 15413 BOLERO DR ...|        fontana|California|                nul

In [9]:
print(new_df.filter("EIN = '463802710'").count())
# We know that we did it correctly

1


In [10]:
# Now we do the same thing to the other table
new_df2 = df2.withColumn('ein', regexp_replace("EIN", "[^0-9]", ""))

In [None]:
new_df2.show()

+------+--------------------+--------------------+---------+----------+--------------------+--------+--------------------+------------------+------+--------------------+-----------+--------------------+--------------------+--------------------+----------+-----+-----+--------------------+-----+------------------+------------------+------------------+
|ascore|            category|         description|      ein|   tot_exp|         admin_exp_p|fund_eff|          fund_exp_p|     program_exp_p|fscore|              leader|leader_comp|       leader_comp_p|               motto|                name|   tot_rev|score|state|         subcategory| size|       program_exp|          fund_exp|         admin_exp|
+------+--------------------+--------------------+---------+----------+--------------------+--------+--------------------+------------------+------+--------------------+-----------+--------------------+--------------------+--------------------+----------+-----+-----+--------------------+-----+--

In [None]:
print(new_df2.filter("EIN = '930642086'").count())
# We know we did it correctly

1


In [None]:
# How many entries do we have in new_df2?
print(new_df2.count())

8710


In [11]:
# Before we join, we want to drop the Name, State, and Category from the first df so we don't have duplicates.

new_df = new_df.drop('Name', 'Category', 'State')

In [12]:
# Join these two on the EIN's
result = new_df.join(new_df2, on=['EIN'], how='inner')

In [13]:
print(result.count())

7053


In [21]:
result.first()

Row(EIN='953667812', Telephone='N/A', Address=' 1650 LOS GAMOS SUITE 110  94903-1838', City='san rafael', Website=None, Facebook=None, Twitter=None, Income='$8,425,395.00', Assests='$9,151,328.00', ascore='96.0', category='Education', description="A leading college success nonprofit in California, 10,000 Degrees helps students from low-income backgrounds get to and through college: 85% of our students are the first in their families to go to college. Thanks to comprehensive personal support, academic counseling, and financial aid management, more than 80% of 10,000 Degrees four-year college students earn bachelor's degrees, compared to 31% of their peers nationally. At the same time, 10,000 Degrees community college students transfer to and graduate from four-year colleges at three times the national average. Unlike with most other college support organizations, there is no GPA requirement for students applying to 10,000 Degrees programs. Currently, 10,000 Degrees supports students in 

In [None]:
result.printSchema()

root
 |-- EIN: string (nullable = true)
 |-- Address: string (nullable = true)
 |-- City: string (nullable = true)
 |-- Website: string (nullable = true)
 |-- Facebook: string (nullable = true)
 |-- Twitter: string (nullable = true)
 |-- Income: string (nullable = true)
 |-- Assests: string (nullable = true)
 |-- ascore: string (nullable = true)
 |-- category: string (nullable = true)
 |-- description: string (nullable = true)
 |-- tot_exp: string (nullable = true)
 |-- admin_exp_p: string (nullable = true)
 |-- fund_eff: string (nullable = true)
 |-- fund_exp_p: string (nullable = true)
 |-- program_exp_p: string (nullable = true)
 |-- fscore: string (nullable = true)
 |-- leader: string (nullable = true)
 |-- leader_comp: string (nullable = true)
 |-- leader_comp_p: string (nullable = true)
 |-- motto: string (nullable = true)
 |-- name: string (nullable = true)
 |-- tot_rev: string (nullable = true)
 |-- score: string (nullable = true)
 |-- state: string (nullable = true)
 |-- subca

In [22]:
#Show the counts of each size of organization. 
result.groupBy(['size']).count().show()

+--------------------+-----+
|                size|count|
+--------------------+-----+
|                86.1|    2|
|                  AZ|    4|
|         United Ways|    6|
|Multipurpose Huma...|    2|
|Gay & Lesbian All...|    2|
|              0.0025|    2|
| 0.04769999999999999|    2|
|             15707.0|    2|
|               90.13|    2|
|                  MN|    2|
|Helping people to...|    2|
|           4208870.0|    2|
|          Neat Stuff|    2|
|          15245808.0|    2|
|Youth Education P...|    4|
|                  NJ|    2|
|      Huston Commons|    2|
|               96.82|    2|
|               76.91|    2|
|              0.0274|    2|
+--------------------+-----+
only showing top 20 rows



In [None]:
#Write Out this new table to csv's

result.write.option("header", "true").csv("/content/drive/MyDrive/HackAThon2021/Data/CharityNavAndUSACharities")

AnalysisException: ignored

In [14]:
# Count how many categories we have. We will use this information in

temp = result.groupBy(["category"]).count()
temp.sort(F.col("count").desc()).show()

+--------------------+-----+
|            category|count|
+--------------------+-----+
|      Human Services| 1997|
|Arts, Culture, Hu...| 1042|
|              Health|  690|
|Community Develop...|  648|
|           Education|  552|
|       International|  470|
|             Animals|  386|
|            Religion|  345|
|         Environment|  332|
|Human and Civil R...|  235|
|Research and Publ...|  112|
|           fieldwork|    2|
|        more than 31|    2|
| that recognizes ...|    2|
| showcasing BGU's...|    2|
| the only nationa...|    2|
|              talent|    2|
|         memory care|    2|
| the Richmond Sym...|    2|
| PHG has grown to...|    2|
+--------------------+-----+
only showing top 20 rows



In [None]:
result.groupBy("state").count().sort(F.col("count").desc()).show()

+-----+-----+
|state|count|
+-----+-----+
|   CA|  952|
|   NY|  923|
|   FL|  400|
|   MA|  326|
|   IL|  295|
|   PA|  281|
|   OH|  237|
|   CO|  236|
|   WA|  203|
|   MN|  200|
|   GA|  196|
|   MI|  192|
|   MO|  192|
|   NC|  183|
|   MD|  158|
|   NJ|  154|
|   AZ|  144|
|   CT|  126|
|   IN|  124|
|   OR|  113|
+-----+-----+
only showing top 20 rows



In [15]:
# Now we will use Spark template table to run SQL queries.

# Create the temp table
result.registerTempTable("resultTemp");

num_of_small_charities = sqlContext.sql("SELECT * FROM resultTemp WHERE size = 'small'").count()

print(num_of_small_charities)

sqlContext.sql("SELECT * FROM resultTemp WHERE size = 'small'").show()

3144
+---------+--------------------+-------------+-------+--------+-------+-------------+--------------+------+--------------------+--------------------+---------+-------------------+--------+--------------------+------------------+------+--------------------+-----------+--------------------+--------------------+--------------------+---------+-----+-----+--------------------+-----+------------------+------------------+------------------+
|      EIN|             Address|         City|Website|Facebook|Twitter|       Income|       Assests|ascore|            category|         description|  tot_exp|        admin_exp_p|fund_eff|          fund_exp_p|     program_exp_p|fscore|              leader|leader_comp|       leader_comp_p|               motto|                name|  tot_rev|score|state|         subcategory| size|       program_exp|          fund_exp|         admin_exp|
+---------+--------------------+-------------+-------+--------+-------+-------------+--------------+------+------------

In [19]:
# Query for high transparency charities

avg_ascore = sqlContext.sql("SELECT AVG(ascore) as averge FROM resultTemp WHERE ascore < 100")

avg_ascore_num = avg_ascore.collect()[0][0]

avg_ascore.show()

# Any score above our average is considered a High Transparency Charity

highTrans = sqlContext.sql("SELECT * FROM resultTemp WHERE ascore > " + str(avg_ascore_num))

print(highTrans.count())

highTrans.show()

+-----------------+
|           averge|
+-----------------+
|87.90331918178309|
+-----------------+

5220
+---------+--------------------+-------------+-------+--------+-------+--------------+--------------+------+--------------------+--------------------+----------+-------------------+--------+--------------------+-------------------+------+--------------------+-----------+--------------------+--------------------+--------------------+----------+-----+-----+--------------------+-----+------------------+------------------+------------------+
|      EIN|             Address|         City|Website|Facebook|Twitter|        Income|       Assests|ascore|            category|         description|   tot_exp|        admin_exp_p|fund_eff|          fund_exp_p|      program_exp_p|fscore|              leader|leader_comp|       leader_comp_p|               motto|                name|   tot_rev|score|state|         subcategory| size|       program_exp|          fund_exp|         admin_exp|
+---------

In [20]:
# Query for low Financial Health charities

avg_fscore = sqlContext.sql("SELECT AVG(fscore) as averge FROM resultTemp WHERE fscore < 100")

avg_fscore_num = avg_fscore.collect()[0][0]

avg_fscore.show()

# Any score below our average is considered a Low Fanicial Health Charity

lowFanHealth = sqlContext.sql("SELECT * FROM resultTemp WHERE fscore < " + str(avg_fscore_num))

print(lowFanHealth.count())

lowFanHealth.show()

+-----------------+
|           averge|
+-----------------+
|81.82275422195409|
+-----------------+

2054
+---------+--------------------+-------------+---------------+--------+-------+---------------+--------------+------+--------------------+--------------------+-----------+-------------------+--------+--------------------+-------------------+------+--------------------+-----------+--------------------+--------------------+--------------------+-----------+-----+-----+--------------------+-----+------------------+------------------+------------------+
|      EIN|             Address|         City|        Website|Facebook|Twitter|         Income|       Assests|ascore|            category|         description|    tot_exp|        admin_exp_p|fund_eff|          fund_exp_p|      program_exp_p|fscore|              leader|leader_comp|       leader_comp_p|               motto|                name|    tot_rev|score|state|         subcategory| size|       program_exp|          fund_exp|        

In [21]:
# Query for High Rated charities
# Upoon investigation, I found that the some of the States were 86.1 adn 81.68 and were causing and invalid average so they were removed from the average computation. 

avg_score = sqlContext.sql("SELECT AVG(score) as average FROM resultTemp WHERE score < 100" )

avg_score.show()

avg_score_num = avg_score.collect()[0][0]

print(avg_score_num)

# Any score above our average is considered a High Ratedy Charity

highRated = sqlContext.sql("SELECT * FROM resultTemp WHERE score > " + str(avg_score_num))

print(highRated.count())

highRated.show()

+----------------+
|         average|
+----------------+
|85.3423696813352|
+----------------+

85.3423696813352
4445
+---------+--------------------+-------------+-------+--------+-------+--------------+---------------+------+--------------------+--------------------+----------+-------------------+--------+--------------------+------------------+------+--------------------+-----------+--------------------+--------------------+--------------------+----------+-----+-----+--------------------+-----+------------------+------------------+------------------+
|      EIN|             Address|         City|Website|Facebook|Twitter|        Income|        Assests|ascore|            category|         description|   tot_exp|        admin_exp_p|fund_eff|          fund_exp_p|     program_exp_p|fscore|              leader|leader_comp|       leader_comp_p|               motto|                name|   tot_rev|score|state|         subcategory| size|       program_exp|          fund_exp|         admin_exp

In [33]:
# Query for Highly Expense charities

avg_expense = sqlContext.sql("SELECT AVG(tot_exp) as averge FROM resultTemp")

avg_expense.show()

avg_expense_num = avg_expense.collect()[0][0]

print(avg_expense_num)

# Any score above our average is considered a Highly Expense Charity

highExpense = result.filter(result["tot_exp"] > avg_expense_num)

testp = sqlContext.sql("SELECT * FROM resultTemp WHERE tot_exp > " + str(avg_expense_num))

print(highExpense.count())

highExpense.show()
testp.show()
print(testp.count())

+--------------------+
|              averge|
+--------------------+
|1.4436565872562567E7|
+--------------------+

14436565.872562567
1053
+---------+---------+--------------------+-------------+-------+--------+-------+---------------+-----------------+------+--------------------+--------------------+-----------+--------------------+--------+--------------------+------------------+------+--------------------+-----------+--------------------+--------------------+--------------------+-----------+-----+-----+--------------------+----+------------------+------------------+------------------+
|      EIN|Telephone|             Address|         City|Website|Facebook|Twitter|         Income|          Assests|ascore|            category|         description|    tot_exp|         admin_exp_p|fund_eff|          fund_exp_p|     program_exp_p|fscore|              leader|leader_comp|       leader_comp_p|               motto|                name|    tot_rev|score|state|         subcategory|size|    

In [None]:
result.printSchema()

root
 |-- EIN: string (nullable = true)
 |-- Address: string (nullable = true)
 |-- City: string (nullable = true)
 |-- Website: string (nullable = true)
 |-- Facebook: string (nullable = true)
 |-- Twitter: string (nullable = true)
 |-- Income: string (nullable = true)
 |-- Assests: string (nullable = true)
 |-- ascore: string (nullable = true)
 |-- category: string (nullable = true)
 |-- description: string (nullable = true)
 |-- tot_exp: string (nullable = true)
 |-- admin_exp_p: string (nullable = true)
 |-- fund_eff: string (nullable = true)
 |-- fund_exp_p: string (nullable = true)
 |-- program_exp_p: string (nullable = true)
 |-- fscore: string (nullable = true)
 |-- leader: string (nullable = true)
 |-- leader_comp: string (nullable = true)
 |-- leader_comp_p: string (nullable = true)
 |-- motto: string (nullable = true)
 |-- name: string (nullable = true)
 |-- tot_rev: string (nullable = true)
 |-- score: string (nullable = true)
 |-- state: string (nullable = true)
 |-- subca