In [None]:
# importing some python modules - just in case
import pandas as pd
import numpy as np
import os, sys 
import io
import ee

In [None]:
pd.set_option('display.max_rows', 150)
pd.set_option('display.max_columns', 50)
pd.set_option('display.width', 1000)

In [None]:
#setting up google drive connection
from google.colab import drive
drive.mount('/content/drive')

In [None]:
# making sure pyspark is up to date
!apt-get update

In [None]:
#pyspark
# Install Java, Spark, and Findspark
!apt-get install openjdk-8-jdk-headless -qq > /dev/null
!wget -q https://downloads.apache.org/spark/spark-2.4.6/spark-2.4.6-bin-hadoop2.7.tgz
!tar xf spark-2.4.6-bin-hadoop2.7.tgz
!pip install -q findspark
# Set Environment Variables
import os
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-8-openjdk-amd64"
os.environ["SPARK_HOME"] = "/content/spark-2.4.6-bin-hadoop2.7"
# Start a SparkSession
import findspark
findspark.init()

In [None]:
# import likely needed modules
from pyspark import SparkFiles
from pyspark import SparkConf                                                                                                                 
from pyspark.context import SparkContext                                                                                                      
from pyspark.sql import SparkSession, SQLContext
from pyspark.sql.functions import udf, struct
from pyspark.sql.types import IntegerType
from pyspark.sql.functions import col
from pyspark.sql.functions import avg
from pyspark.sql.functions import translate
from pyspark.sql.functions import desc

In [None]:
# from pyspark.sql.session import SparkSession
spark = SparkSession.builder.getOrCreate()

In [None]:
#code for remount if needed
#drive.mount("/content/drive", force_remount=True)

In [None]:
#csv location
path = "/content/drive/My Drive/data/clean_ppp_data.csv"

In [None]:
# pandas check
# df=pd.read_csv(path)
# df.sample()

In [None]:
# needed in order to do large file save 
# %pip install py4j


### Import csv

In [None]:
# csv into pyspark dataframe
df=spark.read.csv(SparkFiles.get(path), sep=",", header=True)
df.show()

Check out the data and cast types if needed



In [None]:
# cast from int to string
df = df.withColumn("NAICS Code", df['NAICS Code'].cast('string'))


In [None]:
#remove " from NY - Sate column
from pyspark.sql.functions import udf
quoteReplace=udf(lambda x: x.replace(u'"',''))
df=df.withColumn('State',quoteReplace('State'))
dotReplace=udf(lambda x: x.replace(u'.',''))
df=df.withColumn('State',dotReplace('State'))
df.show(2,truncate=False)


In [None]:
# df.printSchema()

In [None]:
#create a general category column for NAICS codes

df=df.withColumn('NAICS_cat', df['NAICS Code'].substr(0, 2))
df.show(5)

In [None]:
# df.describe()

In [None]:
gender=df.orderBy(df['Gender'].asc())
# gender.show()

**Quick look at veterans, men and women**

In [None]:
#women, men & veteran dataframes
womens = df.filter(df['Gender']== 'Female Owned')
mens = df.filter(df['Gender']== 'Male Owned')
vets=df.filter(df['Veteran']== 'Veteran')

In [None]:
vets.describe().show()

In [None]:
vets.show()

In [None]:
womens.describe().show()


In [None]:
mens.describe().show()



In [None]:
df.select(avg("Loan Amount")).show()

In [None]:
gen_vets=df.groupBy('Gender', 'Veteran').count().orderBy('count')
gen_vets.show(5)

In [None]:
vet_ct=df.groupBy('Veteran').count().orderBy('count')
vet_ct.show()

In [None]:
rc_ct=df.groupBy('Race').count().orderBy('count', ascending=False)
rc_ct.show()

In [None]:
race=df.filter(df['Race'] != 'Unanswered')
race.show(5)


In [None]:
people_df = df.filter(
    (df['Veteran'] != 'Unanswered') & 
    (df['Race'] != 'Unanswered') & 
    (df['Gender'] != 'Unanswered') 
    )
people_df.show(10)

In [None]:
rc_ct=people_df.groupBy('Race',).count().orderBy('count', ascending=False)
rc_ct.show(5)

In [None]:
people_df.count()

In [None]:
#number of rows  for loan counts per 'state' (probably includes virgin islands) from people dataframe
state_loan_count = people_df.groupby("State").agg({"Loan Amount": "count"})
state_loan_count.count()





In [None]:
# loan count per state from smaller dataset (people)
state_loan_count.show()

Send the "people" dataframe to csv.

In [None]:
# relatively small amount of rows, but looks like it will be interesting to work with 
# sending to file and will run visualizations in pandas/tableau and run machine learning on it

people_df.toPandas().to_csv("people_ppp.csv", header=True)
!cp people_ppp.csv "drive/My Drive/"

In [None]:
# banks in main dataframe
top_banks=df.groupBy("Bank").count().orderBy('count', ascending=False)
top_banks.show(10)

In [None]:
# NAICS codes in main dataframe
biz_code=df.groupBy("NAICS Code").count().orderBy('count', ascending=False)
biz_code.show(10)

In [None]:
# NAICS general categories in main dataframe
top_cat_count=df.groupBy("NAICS_cat").count().orderBy('count', ascending=False).show(10)


In [None]:
#verifying nan value removal

from pyspark.sql.functions import isnan, when, count, col

df.select([count(when(isnan(c), c)).alias(c) for c in df.columns]).show()

In [None]:
df.show()

In [None]:
df.filter(df['NAICS_cat'].isNotNull()).count()


In [None]:
df.filter(df['Gender'].isNotNull()).count()

In [None]:
spark.conf.set("spark.sql.execution.arrow.enabled", "true")

In [None]:
#write to csv with pyspark
# df.repartition(1).write.csv("it.csv", sep=',')
# !cp it.csv "drive/My Drive/"

In [None]:
# write to csv with pandas
# df.toPandas().to_csv('ppp_p.csv', index=0)
# !cp ppp_p.csv "drive/My Drive/"