In [1]:
spark

# Initial Setup (Install and import necessary packages)

In [2]:
!pip install gcsfs
!pip install openpyxl

[0m

In [3]:
import pandas as pd
import gcsfs
import calendar
from google.cloud import storage
from google.cloud.exceptions import NotFound
from pyspark.sql.functions import col, lower, instr, date_format, split, rand
from pyspark.sql.functions import monotonically_increasing_id # virtually the same as factorize() from pandas.

In [4]:
# Define the bucket we will save to. Many areas will reference this variable later
bucket = "ppp-loans-bucket"

# Read clean data into appropriate dataframes

In [5]:
ppp_df = spark.read.parquet("gs://ppp-loans-bucket/cleaned/part-*").sample(fraction=0.001,seed=42)

                                                                                

In [6]:
ppp_df.count()

                                                                                

10549

In [7]:
fs = gcsfs.GCSFileSystem(project='cis4400-group-project')
with fs.open("ppp-loans-bucket/cleaned/GDP.csv") as f:
    gdp_df = pd.read_csv(f)
gdp_df

Unnamed: 0,GeoFIPS,GeoName,Region,TableName,LineCode,IndustryClassification,Description,Unit,2017,2018,2019,2020,2021,2022
0,1000,Alabama,5.0,CAGDP1,1,...,Real GDP (thousands of chained 2017 dollars),Thousands of chained 2017 dollars,216615470,220808767,224944577,222081439,231892626,235807320
1,1000,Alabama,5.0,CAGDP1,2,...,Chain-type quantity indexes for real GDP,Quantity index,100,101.936,103.845,102.523,107.053,108.86
2,1000,Alabama,5.0,CAGDP1,3,...,Current-dollar GDP (thousands of current dolla...,Thousands of dollars,216615470,226263784,234526408,235118280,257986516,281569005
3,1001,"Autauga, AL",5.0,CAGDP1,1,...,Real GDP (thousands of chained 2017 dollars),Thousands of chained 2017 dollars,1762558,1787534,1730861,1722438,1727818,1929264
4,1001,"Autauga, AL",5.0,CAGDP1,2,...,Chain-type quantity indexes for real GDP,Quantity index,100,101.417,98.202,97.724,98.029,109.458
...,...,...,...,...,...,...,...,...,...,...,...,...,...,...
9526,97000,Rocky Mountain,7.0,CAGDP1,2,...,Chain-type quantity indexes for real GDP,Quantity index,100,104.559,109.552,109.329,116.661,119.324
9527,97000,Rocky Mountain,7.0,CAGDP1,3,...,Current-dollar GDP (thousands of current dolla...,Thousands of dollars,681310123,730567674,776281078,781272363,880142487,974682556
9528,98000,Far West,8.0,CAGDP1,1,...,Real GDP (thousands of chained 2017 dollars),Thousands of chained 2017 dollars,3797440495,3956948041,4108525822,4048649569,4342903004,4385657757
9529,98000,Far West,8.0,CAGDP1,2,...,Chain-type quantity indexes for real GDP,Quantity index,100,104.2,108.192,106.615,114.364,115.49


In [8]:
fs = gcsfs.GCSFileSystem(project='cis4400-group-project')
with fs.open('ppp-loans-bucket/cleaned/NAICS.csv') as f:
    naics_df = pd.read_csv(f)
naics_df

Unnamed: 0,2022 NAICS US Code,2022 NAICS US Title,industry_type
0,11,"Agriculture, Forestry, Fishing and Hunting","Agriculture, Forestry, Fishing and Hunting"
1,111,Crop Production,"Agriculture, Forestry, Fishing and Hunting"
2,1111,Oilseed and Grain Farming,"Agriculture, Forestry, Fishing and Hunting"
3,11111,Soybean Farming,"Agriculture, Forestry, Fishing and Hunting"
4,111110,Soybean Farming,"Agriculture, Forestry, Fishing and Hunting"
...,...,...,...
2120,9281,National Security and International Affairs,Public Administration
2121,92811,National Security,Public Administration
2122,928110,National Security,Public Administration
2123,92812,International Affairs,Public Administration


# Checking columns

In [9]:
ppp_df.columns

['LoanNumber',
 'DateApproved',
 'SBAOfficeCode',
 'ProcessingMethod',
 'BorrowerName',
 'BorrowerAddress',
 'BorrowerCity',
 'BorrowerState',
 'BorrowerZip',
 'LoanStatusDate',
 'LoanStatus',
 'Term',
 'SBAGuarantyPercentage',
 'InitialApprovalAmount',
 'CurrentApprovalAmount',
 'UndisbursedAmount',
 'FranchiseName',
 'ServicingLenderLocationID',
 'ServicingLenderName',
 'ServicingLenderAddress',
 'ServicingLenderCity',
 'ServicingLenderState',
 'ServicingLenderZip',
 'RuralUrbanIndicator',
 'HubzoneIndicator',
 'LMIIndicator',
 'BusinessAgeDescription',
 'ProjectCity',
 'ProjectCountyName',
 'ProjectState',
 'ProjectZip',
 'CD',
 'JobsReported',
 'NAICSCode',
 'Race',
 'Ethnicity',
 'PAYROLL_PROCEED',
 'BusinessType',
 'OriginatingLenderLocationID',
 'OriginatingLender',
 'OriginatingLenderCity',
 'OriginatingLenderState',
 'Gender',
 'Veteran',
 'NonProfit',
 'ForgivenessAmount',
 'ForgivenessDate',
 'forgiveness_date',
 'date_approved',
 'loan_status_date']

In [10]:
gdp_df.columns

Index(['GeoFIPS', 'GeoName', 'Region', 'TableName', 'LineCode',
       'IndustryClassification', 'Description', 'Unit', '2017', '2018', '2019',
       '2020', '2021', '2022'],
      dtype='object')

In [11]:
naics_df.columns

Index(['2022 NAICS US Code', '2022 NAICS US Title', 'industry_type'], dtype='object')

# Function to turn column names to snake_case

In [12]:
#Turns any columns that has TheseColumnsNames to these_column_names
import re
from pyspark.sql import DataFrame as PySparkDataFrame
def name_to_snake_case(name):
    # Add underscores before capital letters, then convert to lowercase
    return re.sub(r'(?<!^)(?=[A-Z])', '_', name).lower()

#Takes in any type of dataframe and turns its columns into snake_case
def df_to_snake_case(df):
    
    #Transforms PySpark dataframe
    if isinstance(df, PySparkDataFrame):
        print("Transforming PySpark DataFrame...")
        # Rename all columns to snake_case
        snake_case_columns = [name_to_snake_case(col) for col in df.columns]
        df_snake_case = df.toDF(*snake_case_columns)
        print("Completed transforming PySpark DataFrame.")
        return df_snake_case
    
    #Transforms Pandas dataframe
    elif isinstance(df, pd.DataFrame):
        print("Transforming Pandas DataFrame...")
        # Rename all columns to snake_case
        df.columns = [name_to_snake_case(col) for col in df.columns]
        print("Completed transforming Pandas DataFrame.")
        return df
    
    #No valid dataframe found
    else:
        print("ERROR: INVALID DATAFRAME - NO PROCEDURE APPLIED")


In [13]:
# initialize array. Append all dims and tables here. At the end, we loop through this list to save all dataframes.
tables = []

In [14]:
# Function to check how unique a column is. This will be used to verify all ID's in an ID column are unique.
def check_uniqueness(df, col_name):
    if isinstance(df, PySparkDataFrame):
        total_count = df.count()
        distinct_count = df.select(col_name).distinct().count()
    elif isinstance(df, pd.DataFrame):
        total_count = len(df)
        distinct_count = df[col_name].nunique()

    if total_count == distinct_count:
        print(f"The column '{col_name}' contains all unique values.")
    else:
        print(f"The column '{col_name}' contains duplicates. Total rows: {total_count}, Distinct rows: {distinct_count}")

# Dim Borrower

In [15]:
#Create Dim
dim_borrower = ppp_df.select("LoanNumber","BorrowerName","BorrowerAddress",
                            "BorrowerCity","BorrowerState","BorrowerZip",
                            "FranchiseName","BusinessAgeDescription",
                            "Race","Ethnicity","BusinessType","Gender",
                            "Veteran","NonProfit")

#Fix Names
dim_borrower = dim_borrower.withColumnRenamed("LoanNumber","borrower_id")
dim_borrower = df_to_snake_case(dim_borrower)

#Add to tables
tables.append([dim_borrower,"dim_borrower"])

#Show
dim_borrower.show()
dim_borrower.columns

Transforming PySpark DataFrame...
Completed transforming PySpark DataFrame.


                                                                                

+-----------+--------------------+--------------------+---------------+--------------+------------+--------------------+------------------------+----------+--------------------+--------------------+------------+-----------+----------+
|borrower_id|       borrower_name|    borrower_address|  borrower_city|borrower_state|borrower_zip|      franchise_name|business_age_description|      race|           ethnicity|       business_type|      gender|    veteran|non_profit|
+-----------+--------------------+--------------------+---------------+--------------+------------+--------------------+------------------------+----------+--------------------+--------------------+------------+-----------+----------+
| 1966387202|ORTHOSPORT HAWAII...|   5216 -A Hao Place|       Honolulu|            HI|  96821-1653|                NULL|    Existing or more ...|Unanswered|   Unknown/NotStated|Limited  Liabilit...|  Unanswered| Unanswered|      NULL|
| 3441317101|   KAMAKA HAWAII INC|        550 SOUTH ST|     

['borrower_id',
 'borrower_name',
 'borrower_address',
 'borrower_city',
 'borrower_state',
 'borrower_zip',
 'franchise_name',
 'business_age_description',
 'race',
 'ethnicity',
 'business_type',
 'gender',
 'veteran',
 'non_profit']

# Dim SBA office

In [16]:
#Create Dim
dim_SBA_office = ppp_df.select("SBAOfficeCode").distinct().toPandas()

#Fix Names
dim_SBA_office = df_to_snake_case(dim_SBA_office)
dim_SBA_office.rename(columns={'s_b_a_office_code':'SBA_office_code'}, inplace=True)

#Add to tables
tables.append([dim_SBA_office,"dim_SBA_office"])

#Show
dim_SBA_office



Transforming Pandas DataFrame...
Completed transforming Pandas DataFrame.


                                                                                

Unnamed: 0,SBA_office_code
0,876
1,474
2,678
3,639
4,1013
...,...
70,766
71,459
72,669
73,885


# Dim Processing Method

In [17]:
#Create Dim
dim_processing_method = ppp_df.select("ProcessingMethod").distinct().toPandas()

#Fix Names
dim_processing_method = df_to_snake_case(dim_processing_method)

#Add to tables
tables.append([dim_processing_method,"dim_processing_method"])

#Show
dim_processing_method



Transforming Pandas DataFrame...
Completed transforming Pandas DataFrame.


                                                                                

Unnamed: 0,processing_method
0,PPP
1,PPS


# Dim Originating Lender

In [18]:
#Create Dim
dim_originating_lender = ppp_df.select("OriginatingLenderLocationID",
                                      "OriginatingLender","OriginatingLenderCity",
                                      "OriginatingLenderState")

#Fix Names
dim_originating_lender = df_to_snake_case(dim_originating_lender)
dim_originating_lender = dim_originating_lender.withColumnRenamed("originating_lender_location_i_d","originating_lender_location_id")

#Add to tables
tables.append([dim_originating_lender,"dim_originating_lender"])

#Show
dim_originating_lender.show()

Transforming PySpark DataFrame...
Completed transforming PySpark DataFrame.
+------------------------------+--------------------+-----------------------+------------------------+
|originating_lender_location_id|  originating_lender|originating_lender_city|originating_lender_state|
+------------------------------+--------------------+-----------------------+------------------------+
|                         20088| First Hawaiian Bank|               HONOLULU|                      HI|
|                         20088| First Hawaiian Bank|               HONOLULU|                      HI|
|                         19948|      Bank of Hawaii|               HONOLULU|                      HI|
|                         19990|Central Pacific Bank|               HONOLULU|                      HI|
|                         19990|Central Pacific Bank|               HONOLULU|                      HI|
|                         20088| First Hawaiian Bank|               HONOLULU|                      H

# Dim Servicing Lender

In [19]:
#Create Dim
dim_servicing_lender = ppp_df.select('ServicingLenderLocationID', 'ServicingLenderName',
                                     'ServicingLenderAddress', 'ServicingLenderCity', 
                                     'ServicingLenderState','ServicingLenderZip')

#Fix Names
dim_servicing_lender = df_to_snake_case(dim_servicing_lender)

#Add to tables
tables.append([dim_servicing_lender,"dim_servicing_lender"])

#Show
dim_servicing_lender.show()

Transforming PySpark DataFrame...
Completed transforming PySpark DataFrame.
+-----------------------------+---------------------+------------------------+---------------------+----------------------+--------------------+
|servicing_lender_location_i_d|servicing_lender_name|servicing_lender_address|servicing_lender_city|servicing_lender_state|servicing_lender_zip|
+-----------------------------+---------------------+------------------------+---------------------+----------------------+--------------------+
|                        20088|  First Hawaiian Bank|           999 Bishop St|             HONOLULU|                    HI|          96813-4423|
|                        20088|  First Hawaiian Bank|           999 Bishop St|             HONOLULU|                    HI|          96813-4423|
|                        19948|       Bank of Hawaii|              111 S King|             HONOLULU|                    HI|          96813-3501|
|                        19990| Central Pacific Bank| 

# Dim Date

In [20]:
def week_of_month(dt):
    year = dt.year
    month = dt.month
    day = dt.day
    
    cal = calendar.monthcalendar(year,month)
    week_number = (day - 1) // 7 + 1
    return week_number

    
#Create Dim
start_date = pd.to_datetime("2017-01-01")
end_date = pd.to_datetime("2022-12-31")

dim_date = pd.DataFrame({"date": pd.date_range(start_date, end_date, freq="D")})

dim_date['date_id'] = dim_date['date'].dt.strftime('%Y%m%d')
dim_date['year_number'] = dim_date['date'].dt.year
dim_date['month_number'] = dim_date['date'].dt.month
dim_date['day_number'] = dim_date['date'].dt.day
dim_date['week_number'] = dim_date['date'].dt.strftime('%V')
dim_date['week_of_month'] = dim_date['date'].apply(week_of_month)
dim_date['week_of_year'] = dim_date['date'].dt.strftime('%U')
dim_date['month_name'] = dim_date['date'].dt.strftime('%B')
dim_date['day_name'] = dim_date['date'].dt.strftime('%A')
dim_date['timestamp_isoformat'] = dim_date['date'].apply(lambda x: x.isoformat())

#Add to tables
tables.append([dim_date,"dim_date"])

#Show
#print(dim_date.shape)
#print(dim_date.info())
#print(dim_date.head())
dim_date.head()

Unnamed: 0,date,date_id,year_number,month_number,day_number,week_number,week_of_month,week_of_year,month_name,day_name,timestamp_isoformat
0,2017-01-01,20170101,2017,1,1,52,1,1,January,Sunday,2017-01-01T00:00:00
1,2017-01-02,20170102,2017,1,2,1,1,1,January,Monday,2017-01-02T00:00:00
2,2017-01-03,20170103,2017,1,3,1,1,1,January,Tuesday,2017-01-03T00:00:00
3,2017-01-04,20170104,2017,1,4,1,1,1,January,Wednesday,2017-01-04T00:00:00
4,2017-01-05,20170105,2017,1,5,1,1,1,January,Thursday,2017-01-05T00:00:00


# Dim Census Location


In [21]:
#Create Dim
dim_census_location = ppp_df.select('ProjectCity','ProjectCountyName', 
                                    'ProjectState', 'ProjectZip','CD')

#Fix Names
dim_census_location = dim_census_location.withColumnRenamed("CD","cd")
dim_census_location = df_to_snake_case(dim_census_location)

#Add unique ID
dim_census_location = dim_census_location.withColumn("census_location_id", monotonically_increasing_id()+1)

#Add to tables
tables.append([dim_census_location,"dim_census_location"])

#Show
dim_census_location.show()

#Count
print("Records in dim_census_location: "+str(dim_census_location.count()))

Transforming PySpark DataFrame...
Completed transforming PySpark DataFrame.
+---------------+-------------------+-------------+-----------+-----+------------------+
|   project_city|project_county_name|project_state|project_zip|   cd|census_location_id|
+---------------+-------------------+-------------+-----------+-----+------------------+
|       Honolulu|           HONOLULU|           HI| 96821-1653|HI-01|                 1|
|       HONOLULU|           HONOLULU|           HI| 96813-5010|HI-01|                 2|
|       Honolulu|           HONOLULU|           HI| 96826-1079|HI-01|                 3|
|        KAHULUI|               MAUI|           HI| 96732-2926|HI-02|                 4|
|       Mililani|           HONOLULU|           HI| 96789-8094|HI-01|                 5|
|       Honolulu|           HONOLULU|           HI| 96815-2354|HI-01|                 6|
|    PRINCEVILLE|              KAUAI|           HI|      96722|HI-02|                 7|
| MOUNT PLEASANT|              HEN



Records in dim_census_location: 10549


                                                                                

# Dim Industry

In [22]:
naics_df

Unnamed: 0,2022 NAICS US Code,2022 NAICS US Title,industry_type
0,11,"Agriculture, Forestry, Fishing and Hunting","Agriculture, Forestry, Fishing and Hunting"
1,111,Crop Production,"Agriculture, Forestry, Fishing and Hunting"
2,1111,Oilseed and Grain Farming,"Agriculture, Forestry, Fishing and Hunting"
3,11111,Soybean Farming,"Agriculture, Forestry, Fishing and Hunting"
4,111110,Soybean Farming,"Agriculture, Forestry, Fishing and Hunting"
...,...,...,...
2120,9281,National Security and International Affairs,Public Administration
2121,92811,National Security,Public Administration
2122,928110,National Security,Public Administration
2123,92812,International Affairs,Public Administration


In [23]:
#Create Dim
dim_industry = pd.DataFrame()
dim_industry['NAICS_code'] = naics_df['2022 NAICS US Code']
dim_industry['industry_name'] = naics_df['2022 NAICS US Title']
dim_industry['industry_type'] = naics_df['industry_type']

#Add unique ID
dim_industry['industry_id'] = pd.factorize(dim_industry['NAICS_code'])[0] + 1

#Add to tables
tables.append([dim_industry,"dim_industry"])

#Show
dim_industry.head()

Unnamed: 0,NAICS_code,industry_name,industry_type,industry_id
0,11,"Agriculture, Forestry, Fishing and Hunting","Agriculture, Forestry, Fishing and Hunting",1
1,111,Crop Production,"Agriculture, Forestry, Fishing and Hunting",2
2,1111,Oilseed and Grain Farming,"Agriculture, Forestry, Fishing and Hunting",3
3,11111,Soybean Farming,"Agriculture, Forestry, Fishing and Hunting",4
4,111110,Soybean Farming,"Agriculture, Forestry, Fishing and Hunting",5


# Facts GDP

In [24]:
#Create facts
facts_gdp = gdp_df

#Fix names
facts_gdp = df_to_snake_case(facts_gdp)
facts_gdp.rename(columns={'geo_f_i_p_s':'geofips'}, inplace=True)

#Drop unnecessary columns
facts_gdp.drop('industry_classification',axis=1,inplace=True)

#Show
facts_gdp

Transforming Pandas DataFrame...
Completed transforming Pandas DataFrame.


Unnamed: 0,geofips,geo_name,region,table_name,line_code,description,unit,2017,2018,2019,2020,2021,2022
0,1000,Alabama,5.0,CAGDP1,1,Real GDP (thousands of chained 2017 dollars),Thousands of chained 2017 dollars,216615470,220808767,224944577,222081439,231892626,235807320
1,1000,Alabama,5.0,CAGDP1,2,Chain-type quantity indexes for real GDP,Quantity index,100,101.936,103.845,102.523,107.053,108.86
2,1000,Alabama,5.0,CAGDP1,3,Current-dollar GDP (thousands of current dolla...,Thousands of dollars,216615470,226263784,234526408,235118280,257986516,281569005
3,1001,"Autauga, AL",5.0,CAGDP1,1,Real GDP (thousands of chained 2017 dollars),Thousands of chained 2017 dollars,1762558,1787534,1730861,1722438,1727818,1929264
4,1001,"Autauga, AL",5.0,CAGDP1,2,Chain-type quantity indexes for real GDP,Quantity index,100,101.417,98.202,97.724,98.029,109.458
...,...,...,...,...,...,...,...,...,...,...,...,...,...
9526,97000,Rocky Mountain,7.0,CAGDP1,2,Chain-type quantity indexes for real GDP,Quantity index,100,104.559,109.552,109.329,116.661,119.324
9527,97000,Rocky Mountain,7.0,CAGDP1,3,Current-dollar GDP (thousands of current dolla...,Thousands of dollars,681310123,730567674,776281078,781272363,880142487,974682556
9528,98000,Far West,8.0,CAGDP1,1,Real GDP (thousands of chained 2017 dollars),Thousands of chained 2017 dollars,3797440495,3956948041,4108525822,4048649569,4342903004,4385657757
9529,98000,Far West,8.0,CAGDP1,2,Chain-type quantity indexes for real GDP,Quantity index,100,104.2,108.192,106.615,114.364,115.49


In [25]:
#Temporary dataframe to create a connection between facts_gdp and dim_census_location
facts_gdp_spark = spark.createDataFrame(facts_gdp)


#Split the geo_name column so we can better join the two tables through county name.
facts_gdp_spark = facts_gdp_spark.withColumn("geo_name_split", split(col("geo_name"), ",")[0])

facts_gpd_spark_before_count = facts_gdp_spark.count()

#Join the tables through the county name. Left join since we dont want to get rid of any data in facts_gdp
facts_gdp_spark = facts_gdp_spark.join(dim_census_location, 
                                 lower(facts_gdp_spark.geo_name_split) == lower(dim_census_location.project_county_name),
                                "inner")

#Show come columns to confirm that the tables joined correctly (if geo_name is a county, match it and add the census id)
facts_gdp_spark.select("geo_name","project_county_name","census_location_id","2019").orderBy(rand()).limit(10).show()


                                                                                

+------------+-------------------+------------------+---------+
|    geo_name|project_county_name|census_location_id|     2019|
+------------+-------------------+------------------+---------+
|   Essex, MA|              ESSEX|       42949673826| 46020521|
|   Clark, IN|              CLARK|       68719476935|  101.697|
| Madison, MO|            MADISON|       25769804308|  107.159|
|   Union, IL|              UNION|       25769803992|   419360|
|    Knox, TX|               KNOX|       60129542251|   141465|
| Tarrant, TX|            TARRANT|       34359738904|118947910|
|   Lamar, AL|              LAMAR|       25769804509|   339487|
|San Juan, CO|           SAN JUAN|       17179870365|    36106|
|   Scott, IA|              SCOTT|       17179870009|  9591035|
|    Cass, ND|               CASS|       34359739026| 13905366|
+------------+-------------------+------------------+---------+



[Stage 22:>                                                         (0 + 8) / 8]                                                                                

In [26]:
county_name = r"\bMARION\b"
facts_gdp_spark_2 = spark.createDataFrame(facts_gdp)

facts_gdp_spark.filter(lower(col("geo_name")).rlike(county_name.lower())).select("geo_name","project_county_name",
                                                                                   "census_location_id","2019").show()
print(facts_gdp_spark.filter(lower(col("geo_name")).rlike(county_name.lower())).select("geo_name","project_county_name",
                                                                                   "census_location_id","2019").count())

facts_gdp_spark_2.filter(lower(col("geo_name")).rlike(county_name.lower())).select("geo_name","2019").show()
print(facts_gdp_spark_2.filter(lower(col("geo_name")).rlike(county_name.lower())).select("geo_name","2019").count())

dim_census_location.filter(lower(col("project_county_name"))\
                           .rlike(county_name.lower()))\
                            .select("project_county_name","census_location_id","cd").show()
print(dim_census_location.filter(lower(col("project_county_name"))\
                           .rlike(county_name.lower()))\
                            .select("project_county_name","census_location_id","cd").count())

                                                                                

+----------+-------------------+------------------+------+
|  geo_name|project_county_name|census_location_id|  2019|
+----------+-------------------+------------------+------+
|Marion, AL|             MARION|       68719477870|876567|
|Marion, AL|             MARION|       68719477746|876567|
|Marion, AL|             MARION|       60129543260|876567|
|Marion, AL|             MARION|       60129543222|876567|
|Marion, AL|             MARION|       60129543212|876567|
|Marion, AL|             MARION|       60129543208|876567|
|Marion, AL|             MARION|       60129543127|876567|
|Marion, AL|             MARION|       60129542238|876567|
|Marion, AL|             MARION|       60129542218|876567|
|Marion, AL|             MARION|       51539608601|876567|
|Marion, AL|             MARION|       51539608556|876567|
|Marion, AL|             MARION|       51539608426|876567|
|Marion, AL|             MARION|       51539608388|876567|
|Marion, AL|             MARION|       51539608371|87656

                                                                                

3519
+----------+--------+
|  geo_name|    2019|
+----------+--------+
|Marion, AL|  876567|
|Marion, AL| 100.701|
|Marion, AL|  915689|
|Marion, AR|  570355|
|Marion, AR| 113.719|
|Marion, AR|  591863|
|Marion, FL|10291593|
|Marion, FL| 105.406|
|Marion, FL|10762229|
|Marion, GA|  108748|
|Marion, GA|  95.168|
|Marion, GA|  114567|
|Marion, IL| 1325981|
|Marion, IL| 100.278|
|Marion, IL| 1385156|
|Marion, IN|95126100|
|Marion, IN| 105.593|
|Marion, IN|98738022|
|Marion, IA| 2205597|
|Marion, IA| 114.456|
+----------+--------+
only showing top 20 rows

51


                                                                                

+-------------------+------------------+-----+
|project_county_name|census_location_id|   cd|
+-------------------+------------------+-----+
|             MARION|                17|IA-01|
|             MARION|                67|IN-07|
|             MARION|                76|IN-07|
|             MARION|                78|IN-07|
|             MARION|                80|IN-06|
|             MARION|                81|IN-07|
|             MARION|               404|OR-06|
|             MARION|        8589935574|TN-04|
|             MARION|       17179869433|OR-06|
|             MARION|       17179869451|OR-06|
|             MARION|       17179869459|OR-06|
|             MARION|       17179869466|OR-05|
|             MARION|       17179869471|OR-06|
|             MARION|       17179869481|OR-06|
|             MARION|       17179869496|OR-05|
|             MARION|       17179869947|IA-01|
|             MARION|       17179870005|IA-01|
|             MARION|       25769804038|IN-07|
|            



69


                                                                                

In [27]:
#Drop the columns facts_gdp no longer needs
facts_gdp_spark = facts_gdp_spark.drop("geo_name_split", "project_city","project_county_name",
                                      "project_state","project_zip","cd")

#Recheck the columns to make sure everyting is correct
print(facts_gdp_spark.columns)

facts_gpd_spark_after_count = facts_gdp_spark.count()
facts_gpd_pandas_count =len(facts_gdp)
dim_census_location_count = dim_census_location.count()

print("Fact_GPD_SPARK_BEFORE: "+str(facts_gpd_spark_before_count))
print("Fact_GPD_SPARK_AFTER: "+str(facts_gpd_spark_after_count))
print("Fact_GPD_PANDAS: "+str(facts_gpd_pandas_count))
print("DIM_Census_Location: "+str(dim_census_location_count))

#Overwrite the orignal facts_gdp with the updated one that contains the census_location_id
#facts_gdp = facts_gdp_spark.toPandas()
#del facts_gdp_spark

['geofips', 'geo_name', 'region', 'table_name', 'line_code', 'description', 'unit', '2017', '2018', '2019', '2020', '2021', '2022', 'census_location_id']




Fact_GPD_SPARK_BEFORE: 9531
Fact_GPD_SPARK_AFTER: 127020
Fact_GPD_PANDAS: 9531
DIM_Census_Location: 10549


                                                                                

In [28]:
#Check how many of facts_gdp is null after join
!pip install handyspark
import numpy as np
np.bool = np.bool_
from handyspark import *
hsdf = HandyFrame(facts_gdp_spark)
hsdf.isnull()

[0m

                                                                                

geofips               0
geo_name              0
region                0
table_name            0
line_code             0
description           0
unit                  0
2017                  0
2018                  0
2019                  0
2020                  0
2021                  0
2022                  0
census_location_id    0
Name: missing, dtype: int64

In [29]:
#Add unique ID
facts_gdp_spark = facts_gdp_spark.withColumn("facts_gdp_id", monotonically_increasing_id()+1)

#Add to tables
tables.append([facts_gdp_spark,"facts_gdp"])

# Facts PPP Loans

In [30]:
dim_industry.head()

Unnamed: 0,NAICS_code,industry_name,industry_type,industry_id
0,11,"Agriculture, Forestry, Fishing and Hunting","Agriculture, Forestry, Fishing and Hunting",1
1,111,Crop Production,"Agriculture, Forestry, Fishing and Hunting",2
2,1111,Oilseed and Grain Farming,"Agriculture, Forestry, Fishing and Hunting",3
3,11111,Soybean Farming,"Agriculture, Forestry, Fishing and Hunting",4
4,111110,Soybean Farming,"Agriculture, Forestry, Fishing and Hunting",5


In [31]:
#small function to convert date columns to date_id to connect to dim_date table.
def date_to_id(df, column_name):
    df = df.withColumn(column_name+"_id", date_format(column_name, "yyyyMMdd"))
    df = df.drop(column_name)
    return df


#Create facts
facts_PPP_loans = ppp_df.select("SBAGuarantyPercentage","InitialApprovalAmount","CurrentApprovalAmount",
                               "UndisbursedAmount","ForgivenessAmount","LoanNumber","LoanStatus","Term",
                               "RuralUrbanIndicator","HubzoneIndicator","LMIIndicator","JobsReported",
                                "date_approved", "loan_status_date","forgiveness_date",
                               "SBAOfficeCode","ServicingLenderLocationID","OriginatingLenderLocationID",
                               "ProcessingMethod","NAICSCode")

#Convert date cols to date_ids so we can connect it to the dim date if needed
facts_PPP_loans = date_to_id(facts_PPP_loans,"date_approved")
facts_PPP_loans = date_to_id(facts_PPP_loans,"loan_status_date")
facts_PPP_loans = date_to_id(facts_PPP_loans,"forgiveness_date")


#Fix names
facts_PPP_loans = df_to_snake_case(facts_PPP_loans)
facts_PPP_loans = facts_PPP_loans.withColumnRenamed("s_b_a_office_code","sba_office_code")
facts_PPP_loans = facts_PPP_loans.withColumnRenamed("s_b_a_guaranty_percentage","sba_guaranty_percentage")
facts_PPP_loans = facts_PPP_loans.withColumnRenamed("servicing_lender_location_i_d","servicing_lender_location_id")
facts_PPP_loans = facts_PPP_loans.withColumnRenamed("originating_lender_location_i_d","originating_lender_location_id")
facts_PPP_loans = facts_PPP_loans.withColumnRenamed("l_m_i_indicator","lmi_indicator")
facts_PPP_loans = facts_PPP_loans.withColumnRenamed("n_a_i_c_s_code","NAICS_code")


#Add foreign IDs
facts_PPP_loans = facts_PPP_loans.withColumn("census_location_id", monotonically_increasing_id()+1) 
facts_PPP_loans = facts_PPP_loans.withColumn("borrower_id", col("loan_number")) 

#Adding the industry_id as a foreign key.
#- 1st, join facts dataframe with dim_industry on the naics code
facts_PPP_loans = facts_PPP_loans.join(spark.createDataFrame(dim_industry), 
                                       on="NAICS_code", how="inner")
#- 2nd, drop all columns from dim_industry beside the industry id
facts_PPP_loans = facts_PPP_loans.drop("NAICS_code","industry_type","industry_name")

#- 3rd, rename industry_id to NAICS_code_industry_id
facts_PPP_loans = facts_PPP_loans.withColumnRenamed("industry_id","NAICS_code_industry_id")

#Add unique IDs
facts_PPP_loans = facts_PPP_loans.withColumn("fact_ppp_id", monotonically_increasing_id()+1)

#Add to tables
tables.append([facts_PPP_loans,"facts_PPP_loans"])

#Show
facts_PPP_loans.columns

Transforming PySpark DataFrame...
Completed transforming PySpark DataFrame.


['sba_guaranty_percentage',
 'initial_approval_amount',
 'current_approval_amount',
 'undisbursed_amount',
 'forgiveness_amount',
 'loan_number',
 'loan_status',
 'term',
 'rural_urban_indicator',
 'hubzone_indicator',
 'lmi_indicator',
 'jobs_reported',
 'sba_office_code',
 'servicing_lender_location_id',
 'originating_lender_location_id',
 'processing_method',
 'date_approved_id',
 'loan_status_date_id',
 'forgiveness_date_id',
 'census_location_id',
 'borrower_id',
 'NAICS_code_industry_id',
 'fact_ppp_id']

# Functions for saving

Note: All functions assume we have global variable "bucket" that contains the name of the bucket we are working with

In [32]:
def check_folder_exists(folder_name):
    
    # Grab the bucket
    client = storage.Client()
    bucket_ = client.get_bucket(bucket)
    
    # List objects in the bucket with the folder prefix
    blobs = bucket_.list_blobs(prefix=folder_name + '/')
    
    # Check if any blob exists under this folder
    for blob in blobs:
        if blob.name.startswith(folder_name + '/'):
            print(f"Folder '{folder_name}' exists in bucket '{bucket}'.")
            return True
    
    print(f"Folder '{folder_name}' does not exist in bucket '{bucket}'.")
    return False

In [33]:
def create_folder(path, name):
    
    if check_folder_exists(name): 
        print(path+name+'/'+' already exists. Abandoning folder creation')
        return
    
    # Initialize the GCS client
    client = storage.Client()

    # Get the bucket
    _bucket = client.get_bucket(bucket)

    # Define the folder path
    folder_path = path+name+'/'

    # Create a dummy file in the folder (this will simulate the folder in GCS)
    # The 'blob' will not be a real file, but will create the "folder"
    blob = _bucket.blob(folder_path + 'placeholder.txt')  # You can name it anything, like 'placeholder.txt'

    # Upload an empty string or any content to simulate the folder creation
    blob.upload_from_string('')

    print(f"Folder '{folder_path}' created successfully in the bucket '{bucket}'")

In [34]:
def delete_files_named(name_of_file_to_delete):
    # Initialize a client
    client = storage.Client()

    # Specify the bucket name
    _bucket = client.bucket(bucket)

    # List and delete all files named "placeholder.txt"
    blobs = _bucket.list_blobs()

    count_deleted = 0
    for blob in blobs:
        if blob.name.endswith(name_of_file_to_delete):
            blob.delete()
            print(f"Deleted: {blob.name}")
            count_deleted += 1
            
    
    print(f"Deletion of all '{name_of_file_to_delete}' files is complete. Total deleted: "+str(count_deleted))

In [35]:
def save_df(df,name,save_as="parquet",gcs_path=None):

    #we must define the bucket variable at the start of the script, before calling this function.
    
    # Define the GCS path for saving the file
    if gcs_path == None:
        gcs_path = 'gs://'+bucket+'/dim_ready/'+name+'/'+name+'.'+save_as
    
    
    
    print(f"Creating '{name}' folder...")
    create_folder('dim_ready/',name)
    delete_files_named("placeholder.txt")
    
    print("Beginning saving process...")
    flag = 0 # this var will let us know is there as been an error
    
    #Saves Pandas dataframe
    if isinstance(df, pd.DataFrame):
        # Initialize GCS file system
        fs = gcsfs.GCSFileSystem(project='cis4400-group-project')

        # Save DataFrame to the "cleaned" folder in the GCS bucket
        with fs.open(gcs_path, 'w') as f:
            if save_as == "csv":
                df.to_csv(f, index=False)
            else:
                print("INVALD FILE FORMAT. TRY [csv]")
                flag+=1
        
        if flag == 0:
            print("Successfully saved "+name+ "!")
    
    #Saves PySpark dataframe
    elif isinstance(df, PySparkDataFrame):
        if save_as == "parquet":
            df.write.parquet(gcs_path,mode="overwrite")
        elif save_as == "csv":
            df.write.csv(gcs_path,mode="overwrite")
        elif save_as == "avro":
            df.write.format("avro").save(gcs_path,mode="overwrite")
        else:
            print("INVALD FILE FORMAT. TRY [csv] or [parquet] or [avro]")
            flag+=1
        
        if flag == 0:
            print("Successfully saved "+name+ "!")
        
    #No valid dataframe found
    else:
        print("ERROR: INVALID DATAFRAME - NO PROCEDURE APPLIED")

In [36]:
#this function accepts a list of tables, that each contain 2 elements, the table itself, and the name of the table.
#Example, table[0] = [df_1,"df_1_name"], table[1][0] = df_2, table[3][1] = "df_4_name"
def save_list_of_df(list_of_tables,pandas_save="csv",pyspark_save="parquet"):
    for table in list_of_tables:
        if isinstance(table[0], pd.DataFrame):
            save_df(table[0],table[1],save_as=pandas_save)
        elif isinstance(table[0], PySparkDataFrame):
            save_df(table[0],table[1],save_as=pyspark_save)

# Saving Dims and Facts

In [37]:
#We will simply save all Pandas dataframes as CSV's and all PySpark ones as parquet

ENABLE_SAVE = False #boolean to quickly turn on/off saving. Good to turn off saving when debugging.

if ENABLE_SAVE:
    save_list_of_df(tables)

In [38]:
# ISSUES

# There are null values in industry type when joining it on NAICS code. That is because the naics code wasnt found
# inside the dim_industry table. So no value was able to come of it. 


# DIFFERENCES BETWEEN THIS AND CURRENT DIM MODEL

# PPP Fact Table currently has "term" and "term_date_id" in the model. However...
# Script wise, I did not include term_date_id, and left term as is since it doesnt correspond to a date.