# Import

In [1]:
import pyspark
from pyspark.sql.functions import isnan, when, count, col, isnull, avg, round, udf, to_date, when
from pyspark.sql.types import DateType
from datetime import datetime, timedelta

In [2]:
def spark_shape(self):
    return (self.count(), len(self.columns))
pyspark.sql.dataframe.DataFrame.shape = spark_shape
# https://stackoverflow.com/questions/39652767/how-to-find-the-size-or-shape-of-a-dataframe-in-pyspark

In [3]:
import configparser
import os
config = configparser.ConfigParser()
config.read('dwh.cfg')

os.environ['AWS_ACCESS_KEY_ID']=config['AWS']['KEY']
os.environ['AWS_SECRET_ACCESS_KEY']=config['AWS']['SECRET']

In [4]:
def sas_date_to_date(sas_date):
    if (sas_date):
        init_date = datetime(1960, 1, 1)
        return init_date + timedelta(days=sas_date)
    return None
udf_sas_date_to_date = udf(lambda x: sas_date_to_date(x), DateType())

In [5]:
def yyyyMMdd_date_to_date(date):
    try:
        return to_date(date, 'yyyyMMdd')
    except ValueError as e:
        return None

In [6]:
def MMddyyyy_date_to_date(date):
    try:
        today = datetime.now().today().strftime('%Y-%m-%d')
        return when( (to_date(date, 'MMddyyyy') > "1990-01-01") & (to_date(date, 'MMddyyyy') < today), to_date(date, 'MMddyyyy')).otherwise(None)
    #https://robertjblackburn.com/how-to-clean-bad-dates-moving-to-spark-3/
    except ValueError as e:
        return None

## Start spark

In [22]:
from pyspark.sql import SparkSession 
spark = SparkSession.builder.\
        config("spark.executor.memory", "32g").\
        config("spark.jars.packages","saurfang:spark-sas7bdat:3.0.0-s_2.12,org.apache.hadoop:hadoop-aws:3.1.2").\
        config("spark.hadoop.fs.s3a.access.key", os.environ['AWS_ACCESS_KEY_ID']).\
        config("spark.jars.packages2","org.apache.hadoop.fs.s3a.S3AFileSystem").\
        config("spark.hadoop.fs.s3a.secret.key", os.environ['AWS_SECRET_ACCESS_KEY'])\
        .enableHiveSupport().getOrCreate()

## Immigration data

### CREATE DF

In [8]:
months = ['jan', 'feb', 'mar', 'apr', 'may', 'jun', 'jul', 'aug', 'sep', 'oct', 'nov', 'dec']

In [9]:
d = {}
for month in months:
    d[month+'16'] = spark.read.format('com.github.saurfang.sas.spark').load(f'data/i94_{month}16_sub.sas7bdat')
# https://stackoverflow.com/questions/30635145/create-multiple-dataframes-in-loop/30638956

In [10]:
%%time
count_row = 0
df_row = {}
for name, df in d.items():
    row, col = df.shape()
    count_row = count_row + row
    print(name, row, col)
    df_row[name] = row
print(count_row)

21/08/02 23:46:24 WARN package: Truncated the string representation of a plan since it was too large. This behavior can be adjusted by setting 'spark.sql.debug.maxToStringFields'.
                                                                                

jan16 2847924 28


                                                                                

feb16 2570543 28


                                                                                

mar16 3157072 28


                                                                                

apr16 3096313 28


                                                                                

may16 3444249 28


                                                                                

jun16 3574989 34


                                                                                

jul16 4265031 28


                                                                                

aug16 4103570 28


                                                                                

sep16 3733786 28


                                                                                

oct16 3649136 28


                                                                                

nov16 2914926 28




dec16 3432990 28
40790529
CPU times: user 136 ms, sys: 53.5 ms, total: 189 ms
Wall time: 1min 32s


                                                                                

### Drop excess column because d['jun16'] has 34 columns.

In [11]:
d['jun16'].shape()

                                                                                

(3574989, 34)

In [35]:
d['jun16'] = d['jun16'].drop('validres','delete_days','delete_mexl','delete_dup','delete_visa','delete_recdup') 

### CHECK NULL

In [13]:
d_count_null = {}
for name, df in d.items():
    d_count_null[name] = df.select([round((count(when(isnull(c), c))/df_row[name]*100),2).alias(c) for c in df.columns])

In [14]:
df_null_all = d_count_null['jan16']
for name, df in d.items():
    df_null_all = df_null_all.union(d_count_null[name])

In [15]:
%%time
df_null_all.show()



+-----+-----+------+------+------+-------+-------+-------+-------+-------+------+-------+-----+--------+--------+-----+-------+-------+-------+-------+-------+-------+------+------+-------+------+-----+--------+
|cicid|i94yr|i94mon|i94cit|i94res|i94port|arrdate|i94mode|i94addr|depdate|i94bir|i94visa|count|dtadfile|visapost|occup|entdepa|entdepd|entdepu|matflag|biryear|dtaddto|gender|insnum|airline|admnum|fltno|visatype|
+-----+-----+------+------+------+-------+-------+-------+-------+-------+------+-------+-----+--------+--------+-----+-------+-------+-------+-------+-------+-------+------+------+-------+------+-----+--------+
|  0.0|  0.0|   0.0|   0.0|   0.0|    0.0|    0.0|    0.0|   6.22|  18.35|  0.04|    0.0|  0.0|    3.18|   48.68| 98.4|    0.0|  18.32|  100.0|  18.32|   0.04|   0.02|  7.62| 95.13|   2.15|   0.0| 0.43|     0.0|
|  0.0|  0.0|   0.0|   0.0|   0.0|    0.0|    0.0|    0.0|   6.22|  18.35|  0.04|    0.0|  0.0|    3.18|   48.68| 98.4|    0.0|  18.32|  100.0|  18.32| 

                                                                                

## DROP 
### "occup" because too many null and we cannot conclude that people do not have a jobs.
### "count" because it is used for aggregation. we dont need it anymore.
### "i94bir" because it is relative.

In [36]:
for name, df in d.items():
    d[name] = d[name].drop('occup', 'visapost', 'insnum', 'count', "i94bir") ## , 'entdepu' i94bir  is relative

In [17]:
d['jun16'].printSchema()

root
 |-- cicid: double (nullable = true)
 |-- i94yr: double (nullable = true)
 |-- i94mon: double (nullable = true)
 |-- i94cit: double (nullable = true)
 |-- i94res: double (nullable = true)
 |-- i94port: string (nullable = true)
 |-- arrdate: double (nullable = true)
 |-- i94mode: double (nullable = true)
 |-- i94addr: string (nullable = true)
 |-- depdate: double (nullable = true)
 |-- i94visa: double (nullable = true)
 |-- dtadfile: string (nullable = true)
 |-- entdepa: string (nullable = true)
 |-- entdepd: string (nullable = true)
 |-- entdepu: string (nullable = true)
 |-- matflag: string (nullable = true)
 |-- biryear: double (nullable = true)
 |-- dtaddto: string (nullable = true)
 |-- gender: string (nullable = true)
 |-- airline: string (nullable = true)
 |-- admnum: double (nullable = true)
 |-- fltno: string (nullable = true)
 |-- visatype: string (nullable = true)



## Write immigration data to s3

In [37]:
for month in months:
#     d[month+'16'] = spark.read.format('com.github.saurfang.sas.spark').load(f'data/i94_{month}16_sub.sas7bdat')
    d[month+'16'] = d[month+'16'].withColumn("arrdate",udf_sas_date_to_date(d[month+'16'].arrdate))
    d[month+'16'] = d[month+'16'].withColumn("depdate",udf_sas_date_to_date(d[month+'16'].depdate).cast(DateType()))
    d[month+'16'] = d[month+'16'].withColumn("dtadfile", yyyyMMdd_date_to_date(d[month+'16'].dtadfile))
    d[month+'16'] = d[month+'16'].withColumn("dtaddto", MMddyyyy_date_to_date(d[month+'16'].dtaddto))
    
    d[month+'16'] = d[month+'16'].withColumn("i94yr_partition",d[month+'16'].i94yr)
    d[month+'16'] = d[month+'16'].withColumn("i94mon_partition",d[month+'16'].i94mon)
    d[month+'16'] = d[month+'16'].withColumn("arrdate_partition",d[month+'16'].arrdate)

In [19]:
d['aug16'].filter(d['aug16'].dtaddto < "1990-01-01").show() ## filter date before 1990 because s3 cannot be written with invalid date value.



+-----+-----+------+------+------+-------+-------+-------+-------+-------+-------+--------+-------+-------+-------+-------+-------+-------+------+-------+------+-----+--------+---------------+----------------+-----------------+
|cicid|i94yr|i94mon|i94cit|i94res|i94port|arrdate|i94mode|i94addr|depdate|i94visa|dtadfile|entdepa|entdepd|entdepu|matflag|biryear|dtaddto|gender|airline|admnum|fltno|visatype|i94yr_partition|i94mon_partition|arrdate_partition|
+-----+-----+------+------+------+-------+-------+-------+-------+-------+-------+--------+-------+-------+-------+-------+-------+-------+------+-------+------+-----+--------+---------------+----------------+-----------------+
+-----+-----+------+------+------+-------+-------+-------+-------+-------+-------+--------+-------+-------+-------+-------+-------+-------+------+-------+------+-----+--------+---------------+----------------+-----------------+



                                                                                

In [None]:
d['aug16'].groupBy("dtaddto").count().orderBy('dtaddto', ascending=False).show(1,truncate=False) ## find which date is before 1990

In [None]:
d[month_str+year].write.partitionBy("i94yr","i94mon","arrdate").mode("append").parquet("s3a://ohmohmprde/" + "df_immigration")

### FIND duplicated value from all months 

In [46]:
for month in months:
    d[month+'16'] = spark.read.format('com.github.saurfang.sas.spark').load(f'data/i94_{month}16_sub.sas7bdat')

In [47]:
for name, df in d.items():
    d[name] = d[name].drop('occup', 'visapost', 'insnum', 'count', "i94bir") ## , 'entdepu' i94bir  is relative

In [48]:
d['jun16'] = d['jun16'].drop('validres','delete_days','delete_mexl','delete_dup','delete_visa','delete_recdup') 

In [49]:
from pyspark.sql.types import StructType,StructField, StringType, DoubleType
data2 = [(1.0, 1.0, 1.0,1.0, 1.0, "test", \
          1.0, 1.0, "test", 1.0, 1.0, "test", \
          "test", "test", "test", "test", 1.0, "test", \
          "test", "test", 1.0, "test", "test"),
  ]

schema = StructType([ \
    StructField("cicid",DoubleType(),True), \
    StructField("i94yr",DoubleType(),True), \
    StructField("i94mon",DoubleType(),True), \
    StructField("i94cit", DoubleType(), True), \
    StructField("i94res", DoubleType(), True), \
    StructField("i94port", StringType(), True), \

    StructField("arrdate",DoubleType(),True), \
    StructField("i94mode",DoubleType(),True), \
    StructField("i94addr",StringType(),True), \
    StructField("depdate", DoubleType(), True), \
    StructField("i94visa", DoubleType(), True), \
    StructField("dtadfile", StringType(), True), \
                     
    StructField("entdepa",StringType(),True), \
    StructField("entdepd",StringType(),True), \
    StructField("entdepu",StringType(),True), \
    StructField("matflag", StringType(), True), \
    StructField("biryear", DoubleType(), True), \
    StructField("dtaddto", StringType(), True), \
                     
    StructField("gender",StringType(),True), \
    StructField("airline",StringType(),True), \
    StructField("admnum",DoubleType(),True), \
    StructField("fltno", StringType(), True), \
    StructField("visatype", StringType(), True), \
  ])
 
df_concat2 = spark.createDataFrame(data=data2,schema=schema)

In [50]:
for name, df in d.items():
    df_concat2 = df_concat2.union(d[name])

In [51]:
df_concat2.count()

                                                                                

40790530

In [52]:
df_concat2.select("admnum").distinct().count()

                                                                                

40273677

In [53]:
df_concat2.select("cicid").distinct().count()

                                                                                

7380352

In [54]:
df_concat2.groupBy("admnum","cicid").count().orderBy('count', ascending=False).show(100,truncate=False)



+--------------+---------+-----+
|admnum        |cicid    |count|
+--------------+---------+-----+
|9.451269885E9 |1668559.0|2    |
|3.09865585E8  |36105.0  |1    |
|3.12757685E8  |336.0    |1    |
|4.42326685E8  |649.0    |1    |
|3.31405385E8  |978.0    |1    |
|4.22004085E8  |1049.0   |1    |
|3.38600285E8  |1223.0   |1    |
|4.16907485E8  |1712.0   |1    |
|3.37632285E8  |1783.0   |1    |
|3.86940785E8  |2061.0   |1    |
|3.04249085E8  |2190.0   |1    |
|3.60613085E8  |2355.0   |1    |
|3.85515285E8  |2674.0   |1    |
|3.34743985E8  |3130.0   |1    |
|3.91743085E8  |4109.0   |1    |
|4.08960985E8  |4492.0   |1    |
|3.00697085E8  |4532.0   |1    |
|3.81505885E8  |4554.0   |1    |
|3.32396285E8  |4915.0   |1    |
|4.29965385E8  |5022.0   |1    |
|4.07627085E8  |5317.0   |1    |
|4.05769085E8  |5368.0   |1    |
|3.08373185E8  |5387.0   |1    |
|3.76426585E8  |5513.0   |1    |
|3.03877985E8  |5517.0   |1    |
|4.29362185E8  |5940.0   |1    |
|3.35306785E8  |6609.0   |1    |
|4.3099108

                                                                                

In [55]:
df_concat2.filter((df_concat2.admnum == 9.451269885E9) & (df_concat2.cicid == 1668559.0)).show()



+---------+------+------+------+------+-------+-------+-------+-------+-------+-------+--------+-------+-------+-------+-------+-------+--------+------+-------+-------------+-----+--------+
|    cicid| i94yr|i94mon|i94cit|i94res|i94port|arrdate|i94mode|i94addr|depdate|i94visa|dtadfile|entdepa|entdepd|entdepu|matflag|biryear| dtaddto|gender|airline|       admnum|fltno|visatype|
+---------+------+------+------+------+-------+-------+-------+-------+-------+-------+--------+-------+-------+-------+-------+-------+--------+------+-------+-------------+-----+--------+
|1668559.0|2016.0|   9.0| 438.0| 438.0|    NYC|20705.0|    1.0|     NY|20728.0|    2.0|20160908|      O|      O|   null|      M| 1990.0|12062016|     F|     TK|9.451269885E9|    3|      WT|
|1668559.0|2016.0|  10.0| 438.0| 438.0|    CHI|20735.0|    1.0|     IL|20767.0|    2.0|20161008|      H|      O|   null|      M| 1990.0|12062016|     F|     RS|9.451269885E9|07595|      WT|
+---------+------+------+------+------+-------+---

                                                                                

In [56]:
df_concat2.select("cicid","i94yr","i94mon").distinct().count()

                                                                                

40790530

## DESCRIPTION

In [9]:
import pandas as pd

In [11]:
with open('data/I94_SAS_Labels_Descriptions.SAS') as f:
    f_content = f.read()
    f_content = f_content.replace('\t', '')

In [12]:
def code_mapper(file, idx):
    f_content2 = f_content[f_content.index(idx):]
    f_content2 = f_content2[:f_content2.index(';')].split('\n')
    f_content2 = [i.replace("'", "") for i in f_content2]
    dic = [i.split('=') for i in f_content2[1:]]
    dic = dict([i[0].strip(), i[1].strip()] for i in dic if len(i) == 2)
    return dic
# https://knowledge.udacity.com/questions/125439

In [13]:
i94cit_res = code_mapper(f_content, "i94cntyl")
i94port = code_mapper(f_content, "i94prtl")
i94mode = code_mapper(f_content, "i94model")
i94addr = code_mapper(f_content, "i94addrl")
i94visa = {'1':'Business',
'2': 'Pleasure',
'3' : 'Student'}

In [64]:
i94cit_res_df = pd.DataFrame(i94cit_res.items(), columns=['City_Code', 'City'])

In [65]:
i94cit_res_df

Unnamed: 0,City_Code,City
0,582,"MEXICO Air Sea, and Not Reported (I-94, no lan..."
1,236,AFGHANISTAN
2,101,ALBANIA
3,316,ALGERIA
4,102,ANDORRA
...,...,...
284,791,No Country Code (791)
285,849,No Country Code (849)
286,914,No Country Code (914)
287,944,No Country Code (944)


In [68]:
pd.DataFrame(i94cit_res_df).to_csv('stage/i94cit_res_df.csv', index=False)

## Port

In [14]:
i94port_df = pd.DataFrame(i94port.items(), columns=['Port_Code', 'Port'])

In [15]:
i94port_df

Unnamed: 0,Port_Code,Port
0,ALC,"ALCAN, AK"
1,ANC,"ANCHORAGE, AK"
2,BAR,"BAKER AAF - BAKER ISLAND, AK"
3,DAC,"DALTONS CACHE, AK"
4,PIZ,"DEW STATION PT LAY DEW, AK"
...,...,...
655,ADU,No PORT Code (ADU)
656,AKT,No PORT Code (AKT)
657,LIT,No PORT Code (LIT)
658,A2A,No PORT Code (A2A)


In [16]:
pd.DataFrame(i94port_df).to_csv('stage/i94port_df.csv', index=False)

In [17]:
port_df = spark.read.option("header",True).csv("stage/i94port_df.csv")

In [18]:
port_df = port_df.withColumnRenamed("Port_Code","port_code")\
                .withColumnRenamed("Port","port")

In [24]:
port_df.write.mode("overwrite").option("header","true").parquet("s3a://ohmohmprde/" + "df_i94port")

                                                                                

In [None]:
port_df = spark.read.parquet("s3a://ohmohmprde/df_i94port")

In [93]:
port_df.printSchema()

root
 |-- port_code: string (nullable = true)
 |-- port: string (nullable = true)



In [94]:
port_df.show()

[Stage 71:>                                                         (0 + 1) / 1]

+---------+--------------------+
|port_code|                port|
+---------+--------------------+
|      ALC|           ALCAN, AK|
|      ANC|       ANCHORAGE, AK|
|      BAR|BAKER AAF - BAKER...|
|      DAC|   DALTONS CACHE, AK|
|      PIZ|DEW STATION PT LA...|
|      DTH|    DUTCH HARBOR, AK|
|      EGL|           EAGLE, AK|
|      FRB|       FAIRBANKS, AK|
|      HOM|           HOMER, AK|
|      HYD|           HYDER, AK|
|      JUN|          JUNEAU, AK|
|      5KE|       KETCHIKAN, AK|
|      KET|       KETCHIKAN, AK|
|      MOS|MOSES POINT INTER...|
|      NIK|         NIKISKI, AK|
|      NOM|             NOM, AK|
|      PKC|     POKER CREEK, AK|
|      ORI|  PORT LIONS SPB, AK|
|      SKA|         SKAGWAY, AK|
|      SNP| ST. PAUL ISLAND, AK|
+---------+--------------------+
only showing top 20 rows



                                                                                

## Mode

In [70]:
i94mode_df = pd.DataFrame(i94mode.items(), columns=['Transport_Code', 'Transport_Type'])

In [71]:
i94mode_df

Unnamed: 0,Transport_Code,Transport_Type
0,1,Air
1,2,Sea
2,3,Land
3,9,Not reported


In [72]:
pd.DataFrame(i94mode_df).to_csv('stage/i94mode_df.csv', index=False)

## ADDR

In [73]:
i94addr_df = pd.DataFrame(i94addr.items(), columns=['abbreviation_code', 'fullname_state'])

In [None]:
i94addr_df(2)

In [75]:
pd.DataFrame(i94addr_df).to_csv('stage/i94addr_df.csv', index=False)

In [83]:
addr_df = spark.read.option("header",True).csv("stage/i94addr_df.csv")

In [None]:
addr_df.write.mode("overwrite").option("header","true").parquet("s3a://ohmohmprde/" + "df_i94addr")

In [84]:
addr_df = spark.read.parquet("s3a://ohmohmprde/df_i94addr")

21/08/03 00:25:54 WARN MetricsConfig: Cannot locate configuration: tried hadoop-metrics2-s3a-file-system.properties,hadoop-metrics2.properties
                                                                                

In [85]:
addr_df.show()

[Stage 66:>                                                         (0 + 1) / 1]

+-----------------+-----------------+
|abbreviation_code|   fullname_state|
+-----------------+-----------------+
|               AL|          ALABAMA|
|               AK|           ALASKA|
|               AZ|          ARIZONA|
|               AR|         ARKANSAS|
|               CA|       CALIFORNIA|
|               CO|         COLORADO|
|               CT|      CONNECTICUT|
|               DE|         DELAWARE|
|               DC|DIST. OF COLUMBIA|
|               FL|          FLORIDA|
|               GA|          GEORGIA|
|               GU|             GUAM|
|               HI|           HAWAII|
|               ID|            IDAHO|
|               IL|         ILLINOIS|
|               IN|          INDIANA|
|               IA|             IOWA|
|               KS|           KANSAS|
|               KY|         KENTUCKY|
|               LA|        LOUISIANA|
+-----------------+-----------------+
only showing top 20 rows



                                                                                

In [86]:
addr_df.printSchema()

root
 |-- abbreviation_code: string (nullable = true)
 |-- fullname_state: string (nullable = true)



## VISA

In [76]:
i94visa_df = pd.DataFrame(i94visa.items(), columns=['Visa_Code', 'Reason'])

In [77]:
i94visa_df

Unnamed: 0,Visa_Code,Reason
0,1,Business
1,2,Pleasure
2,3,Student


In [78]:
pd.DataFrame(i94visa_df).to_csv('stage/i94visa_df.csv', index=False)

In [79]:
visa_df = spark.read.option("header",True).csv("stage/i94visa_df.csv")

In [80]:
visa_df = visa_df.withColumnRenamed("Visa_Code","visa_code")\
                .withColumnRenamed("Reason","reason")

In [None]:
visa_df.write.mode("overwrite").option("header","true").parquet("s3a://ohmohmprde/" + "df_i94visa")

In [None]:
visa_df = spark.read.parquet("s3a://ohmohmprde/df_i94visa")

In [81]:
visa_df.show()

+---------+--------+
|visa_code|  reason|
+---------+--------+
|        1|Business|
|        2|Pleasure|
|        3| Student|
+---------+--------+



In [82]:
visa_df.printSchema()

root
 |-- visa_code: string (nullable = true)
 |-- reason: string (nullable = true)



# Production DEMOGRAPHICS

In [None]:
from pyspark.sql.types import IntegerType

In [None]:
demographics_df = spark.read.option("header",True).options(delimiter=';').csv("data/us-cities-demographics.csv") 

In [None]:
demographics_df.filter(demographics_df["State Code"] == "TN").sort(demographics_df["City"].asc()).show()

In [None]:
demographics_df_main = demographics_df.select("State Code", "State", "City", "Male Population", "Female Population", "Total Population", "Number of Veterans", "Foreign-born")

In [None]:
demographics_df_main = demographics_df_main.dropDuplicates()

In [None]:
demographics_df_main.count() ## select only city and state 

In [None]:
demographics_df_main.filter(demographics_df_main["State Code"] == "TN").sort(demographics_df_main["City"].asc()).show()

In [None]:
demographics_df_main = demographics_df_main.withColumn("Male Population", demographics_df_main["Male Population"].cast(IntegerType()))\
                                        .withColumn("Female Population", demographics_df_main["Female Population"].cast(IntegerType()))\
                                        .withColumn("Total Population", demographics_df_main["Total Population"].cast(IntegerType()))\
                                        .withColumn("Number of Veterans", demographics_df_main["Number of Veterans"].cast(IntegerType()))\
                                        .withColumn("Foreign-born", demographics_df_main["Foreign-born"].cast(IntegerType()))

In [None]:
demographics_df_main = demographics_df_main.groupBy("State Code").sum("Total Population","Male Population", "Female Population", "Number of Veterans",  "Foreign-born")

In [None]:
demographics_df_main = demographics_df_main.withColumnRenamed("State Code","state_code")\
                                        .withColumnRenamed("State","state")\
                                        .withColumnRenamed("City","city")\
                                        .withColumnRenamed("sum(Total Population)","total_population")\
                                        .withColumnRenamed("sum(Male Population)","male_population")\
                                        .withColumnRenamed("sum(Female Population)","female_population")\
                                        .withColumnRenamed("sum(Number of Veterans)","number_of_veterans")\
                                        .withColumnRenamed("sum(Foreign-born)","foreign_born")

In [None]:
demographics_df_main.sort(demographics_df_main["state_code"].asc()).show()

In [None]:
demographics_df_main.groupBy().sum("total_population").show()

In [None]:
demographics_df_race = demographics_df.select("State Code", "City", "State", "Race", "Count")

In [None]:
demographics_df_race = demographics_df_race.withColumnRenamed("State Code","state_code_r")\
                                        .withColumnRenamed("City","city_r")\
                                        .withColumnRenamed("State","state_r")\
                                        .withColumnRenamed("Race","race")\
                                        .withColumnRenamed("Count","count")

In [None]:
demographics_df_race.filter(demographics_df_race["state_code_r"] == "TN").sort(demographics_df_race["city_r"].asc()).show()

In [None]:
demographics_df_race.count()

In [None]:
demographics_df_race = demographics_df_race.withColumn("count", demographics_df_race["count"].cast(IntegerType()))

In [None]:
demographics_df_race = demographics_df_race.groupBy("state_code_r").pivot("race").sum("count")

In [None]:
demographics_df_race.sort(demographics_df_race["state_code_r"].asc()).show()

In [None]:
demographics_df_by_state = demographics_df_main.join(demographics_df_race,demographics_df_main["state_code"] ==  demographics_df_race["state_code_r"],"inner")

In [None]:
demographics_df_by_state = demographics_df_by_state.drop("state_code_r")

In [None]:
demographics_df_by_state = demographics_df_by_state.withColumnRenamed("American Indian and Alaska Native","american_indian_and_alaska_native")\
                                        .withColumnRenamed("Asian","asian")\
                                        .withColumnRenamed("Black or African-American","black_or_african_american")\
                                        .withColumnRenamed("Hispanic or Latino","hispanic_or_latino")\
                                        .withColumnRenamed("White","white")

In [None]:
demographics_df_by_state.show()

In [None]:
demographics_df_by_state.count()

In [None]:
demographics_df_by_state.printSchema()

In [None]:
demographics_df_by_state = demographics_df_by_state.sort(demographics_df_by_state["state_code"].asc())

In [None]:
demographics_df_by_state.coalesce(1).write.mode('overwrite').csv('demographics_df_by_state')

In [None]:
demographics_df_by_state.write.mode("overwrite").option("header","true").parquet("s3a://ohmohmprde/" + "df_demo")

## test read s3

In [25]:
test_df = spark.read.parquet("s3a://ohmohmprde/df_immigration/i94yr_partition=2016.0/i94mon_partition=9.0/")

                                                                                

In [9]:
test_df = spark.read.parquet("s3a://ohmohmprde/df_i94port")

                                                                                

In [26]:
test_df.count()

                                                                                

1768505

In [10]:
test_df.printSchema()

root
 |-- port_code: string (nullable = true)
 |-- port: string (nullable = true)



In [11]:
test_df.show()

[Stage 1:>                                                          (0 + 1) / 1]

+---------+--------------------+
|port_code|                port|
+---------+--------------------+
|      ALC|           ALCAN, AK|
|      ANC|       ANCHORAGE, AK|
|      BAR|BAKER AAF - BAKER...|
|      DAC|   DALTONS CACHE, AK|
|      PIZ|DEW STATION PT LA...|
|      DTH|    DUTCH HARBOR, AK|
|      EGL|           EAGLE, AK|
|      FRB|       FAIRBANKS, AK|
|      HOM|           HOMER, AK|
|      HYD|           HYDER, AK|
|      JUN|          JUNEAU, AK|
|      5KE|       KETCHIKAN, AK|
|      KET|       KETCHIKAN, AK|
|      MOS|MOSES POINT INTER...|
|      NIK|         NIKISKI, AK|
|      NOM|             NOM, AK|
|      PKC|     POKER CREEK, AK|
|      ORI|  PORT LIONS SPB, AK|
|      SKA|         SKAGWAY, AK|
|      SNP| ST. PAUL ISLAND, AK|
+---------+--------------------+
only showing top 20 rows



                                                                                

In [None]:
test_df.sort(test_df["cicid"].desc()).show()