In [53]:
import pandas as pd
import re
import os

import matplotlib.pyplot as plt

from pyspark.sql import SparkSession
from pyspark.sql.functions import unix_timestamp, udf, lit, explode, split, regexp_extract, col, isnan, isnull, desc, when, sum, to_date, desc, regexp_replace, count, to_timestamp
from pyspark.sql.types import IntegerType, TimestampType

In [3]:
#setting visualization options
# https://www.1week4.com/it/machine-learning/udacity-data-engineering-capstone-project/
pd.set_option('display.max_colwidth', -1)
pd.set_option('display.max_columns', None)  

# modify visualization of the notebook, for easier view
from IPython.core.display import display, HTML
display(HTML("""<style> p { max-width:90% !important; } h1 {font-size:2rem!important } h2 {font-size:1.6rem!important } 
h3 {font-size:1.4rem!important } h4 {font-size:1.3rem!important }h5 {font-size:1.2rem!important }h6 {font-size:1.1rem!important }</style>"""))# Do all imports and installs here


In [4]:
def create_spark_session():
    """
    This function creates a Spark Sesson and includes necessary Jar and adoop packages in the configuration. 
    """
    spark=SparkSession \
    .builder \
    .config("spark.jars.repositories", "https://repos.spark-packages.org/") \
    .config("spark.jars.packages", "saurfang:spark-sas7bdat:2.0.0-s_2.11") \
    .enableHiveSupport() \
    .getOrCreate()
    return spark

In [5]:
spark = create_spark_session()

In [59]:
I94_DATASET_PATH = '../../../../../data/18-83510-I94-Data-2016/'

filelist = os.listdir(I94_DATASET_PATH)
print(f"The dataset contains {len(filelist)} files")

The dataset contains 12 files


In [60]:
for file in filelist:
    size = os.path.getsize('{}/{}'.format(I94_DATASET_PATH, file))
    print(f'{file} - dim(bytes): {size}')

i94_apr16_sub.sas7bdat - dim(bytes): 471990272
i94_sep16_sub.sas7bdat - dim(bytes): 569180160
i94_nov16_sub.sas7bdat - dim(bytes): 444334080
i94_mar16_sub.sas7bdat - dim(bytes): 481296384
i94_jun16_sub.sas7bdat - dim(bytes): 716570624
i94_aug16_sub.sas7bdat - dim(bytes): 625541120
i94_may16_sub.sas7bdat - dim(bytes): 525008896
i94_jan16_sub.sas7bdat - dim(bytes): 434176000
i94_oct16_sub.sas7bdat - dim(bytes): 556269568
i94_jul16_sub.sas7bdat - dim(bytes): 650117120
i94_feb16_sub.sas7bdat - dim(bytes): 391905280
i94_dec16_sub.sas7bdat - dim(bytes): 523304960


#### Create a dataframe
*Note*: If this fails with `Failed to find data source: com.github.saurfang.sas.spark` then reset the Udactiy workspace

In [6]:
I94_TEST_FILE = '../../../../../data/18-83510-I94-Data-2016/i94_aug16_sub.sas7bdat'

df_I94 = spark.read.format('com.github.saurfang.sas.spark').load(I94_TEST_FILE).persist()


#### Inspect the df

In [62]:
df_I94.limit(5).toPandas().head()

Unnamed: 0,cicid,i94yr,i94mon,i94cit,i94res,i94port,arrdate,i94mode,i94addr,depdate,i94bir,i94visa,count,dtadfile,visapost,occup,entdepa,entdepd,entdepu,matflag,biryear,dtaddto,gender,insnum,airline,admnum,fltno,visatype
0,22.0,2016.0,8.0,323.0,323.0,NYC,20667.0,1.0,FL,,23.0,3.0,1.0,20160801,RID,,U,,,,1993.0,D/S,M,,EK,64510500000.0,201,F1
1,55.0,2016.0,8.0,209.0,209.0,AGA,20667.0,1.0,CA,,41.0,2.0,1.0,20160801,,,A,,,,1975.0,09142016,M,3955.0,JL,57571870000.0,941,GMT
2,56.0,2016.0,8.0,209.0,209.0,AGA,20667.0,1.0,GU,,24.0,2.0,1.0,20160801,,,A,,,,1992.0,09152016,F,3661.0,UA,57571890000.0,874,GMT
3,61.0,2016.0,8.0,213.0,213.0,CHI,20667.0,1.0,WA,20774.0,27.0,3.0,1.0,20160801,BMB,,U,O,,M,1989.0,D/S,M,,UA,59059190000.0,906,F1
4,64.0,2016.0,8.0,111.0,111.0,BOS,20667.0,1.0,MS,20670.0,34.0,2.0,1.0,20160804,,,G,O,,M,1982.0,08242016,F,32572.0,QK,61043090000.0,8456,WT


In [63]:
df_I94.printSchema()

root
 |-- cicid: double (nullable = true)
 |-- i94yr: double (nullable = true)
 |-- i94mon: double (nullable = true)
 |-- i94cit: double (nullable = true)
 |-- i94res: double (nullable = true)
 |-- i94port: string (nullable = true)
 |-- arrdate: double (nullable = true)
 |-- i94mode: double (nullable = true)
 |-- i94addr: string (nullable = true)
 |-- depdate: double (nullable = true)
 |-- i94bir: double (nullable = true)
 |-- i94visa: double (nullable = true)
 |-- count: double (nullable = true)
 |-- dtadfile: string (nullable = true)
 |-- visapost: string (nullable = true)
 |-- occup: string (nullable = true)
 |-- entdepa: string (nullable = true)
 |-- entdepd: string (nullable = true)
 |-- entdepu: string (nullable = true)
 |-- matflag: string (nullable = true)
 |-- biryear: double (nullable = true)
 |-- dtaddto: string (nullable = true)
 |-- gender: string (nullable = true)
 |-- insnum: string (nullable = true)
 |-- airline: string (nullable = true)
 |-- admnum: double (nullable = 

#### Convert doubles to integers

In [64]:
# Snippet taken from https://www.1week4.com/it/machine-learning/udacity-data-engineering-capstone-project/
toInt = udf(lambda x: int(x) if x!=None else x, IntegerType())

for colname, coltype in df_I94.dtypes:
    if coltype == 'double':
        df_I94 = df_I94.withColumn(colname, toInt(colname))

#### Convert strings to dates

In [65]:
df_I94 = df_I94.withColumn('dtadfile',to_date((col("dtadfile")),"yyyyMMdd"))

In [66]:
df_I94 = df_I94.withColumn('dtaddto',to_date(col("dtaddto"),"MMddyyyy"))

#### Convert SAS epoch dates from integers to dates

In [67]:
# from https://knowledge.udacity.com/questions/66798
from datetime import datetime, timedelta
from pyspark.sql import types as T
def convert_datetime(x):
    try:
        start = datetime(1960, 1, 1)
        return start + timedelta(days=int(x))
    except:
        return None
udf_datetime_from_sas = udf(lambda x: convert_datetime(x), T.DateType())

#### df_I94.limit(5).toPandas().head()

In [69]:
# df_I94 = df_I94.withColumn('dtaddto',to_date(col("dtaddto"),"MMddyyyy")
df_I94 = df_I94.withColumn("arrdate", udf_datetime_from_sas("arrdate")) 

In [70]:
df_I94.limit(5).toPandas().head()

Unnamed: 0,cicid,i94yr,i94mon,i94cit,i94res,i94port,arrdate,i94mode,i94addr,depdate,i94bir,i94visa,count,dtadfile,visapost,occup,entdepa,entdepd,entdepu,matflag,biryear,dtaddto,gender,insnum,airline,admnum,fltno,visatype
0,22,2016,8,323,323,NYC,2016-08-01,1,FL,,23,3,1,2016-08-01,RID,,U,,,,1993,,M,,EK,85986190,201,F1
1,55,2016,8,209,209,AGA,2016-08-01,1,CA,,41,2,1,2016-08-01,,,A,,,,1975,2016-09-14,M,3955.0,JL,1737294085,941,GMT
2,56,2016,8,209,209,AGA,2016-08-01,1,GU,,24,2,1,2016-08-01,,,A,,,,1992,2016-09-15,F,3661.0,UA,1737319685,874,GMT
3,61,2016,8,213,213,CHI,2016-08-01,1,WA,20774.0,27,3,1,2016-08-01,BMB,,U,O,,M,1989,,M,,UA,-1070347714,906,F1
4,64,2016,8,111,111,BOS,2016-08-01,1,MS,20670.0,34,2,1,2016-08-04,,,G,O,,M,1982,2016-08-24,F,32572.0,QK,913547189,8456,WT


In [71]:
df_I94 = df_I94.withColumn("depdate", udf_datetime_from_sas("depdate")) 

In [72]:
df_I94.limit(5).toPandas().head()

Unnamed: 0,cicid,i94yr,i94mon,i94cit,i94res,i94port,arrdate,i94mode,i94addr,depdate,i94bir,i94visa,count,dtadfile,visapost,occup,entdepa,entdepd,entdepu,matflag,biryear,dtaddto,gender,insnum,airline,admnum,fltno,visatype
0,22,2016,8,323,323,NYC,2016-08-01,1,FL,,23,3,1,2016-08-01,RID,,U,,,,1993,,M,,EK,85986190,201,F1
1,55,2016,8,209,209,AGA,2016-08-01,1,CA,,41,2,1,2016-08-01,,,A,,,,1975,2016-09-14,M,3955.0,JL,1737294085,941,GMT
2,56,2016,8,209,209,AGA,2016-08-01,1,GU,,24,2,1,2016-08-01,,,A,,,,1992,2016-09-15,F,3661.0,UA,1737319685,874,GMT
3,61,2016,8,213,213,CHI,2016-08-01,1,WA,2016-11-16,27,3,1,2016-08-01,BMB,,U,O,,M,1989,,M,,UA,-1070347714,906,F1
4,64,2016,8,111,111,BOS,2016-08-01,1,MS,2016-08-04,34,2,1,2016-08-04,,,G,O,,M,1982,2016-08-24,F,32572.0,QK,913547189,8456,WT


In [73]:
df_I94.printSchema()

root
 |-- cicid: integer (nullable = true)
 |-- i94yr: integer (nullable = true)
 |-- i94mon: integer (nullable = true)
 |-- i94cit: integer (nullable = true)
 |-- i94res: integer (nullable = true)
 |-- i94port: string (nullable = true)
 |-- arrdate: date (nullable = true)
 |-- i94mode: integer (nullable = true)
 |-- i94addr: string (nullable = true)
 |-- depdate: date (nullable = true)
 |-- i94bir: integer (nullable = true)
 |-- i94visa: integer (nullable = true)
 |-- count: integer (nullable = true)
 |-- dtadfile: date (nullable = true)
 |-- visapost: string (nullable = true)
 |-- occup: string (nullable = true)
 |-- entdepa: string (nullable = true)
 |-- entdepd: string (nullable = true)
 |-- entdepu: string (nullable = true)
 |-- matflag: string (nullable = true)
 |-- biryear: integer (nullable = true)
 |-- dtaddto: date (nullable = true)
 |-- gender: string (nullable = true)
 |-- insnum: string (nullable = true)
 |-- airline: string (nullable = true)
 |-- admnum: integer (nullable

In [74]:
df_I94.select("gender").groupBy("gender").count().show()

+------+-------+
|gender|  count|
+------+-------+
|     F|1930751|
|  null| 233112|
|     M|1936845|
|     U|   2514|
|     X|    348|
+------+-------+



In [15]:
df_I94.select("i94port").groupBy("i94port").count().orderBy(desc('count')).show()

+-------+------+
|i94port| count|
+-------+------+
|    NYC|696609|
|    LOS|466224|
|    MIA|439593|
|    SFR|249132|
|    HHW|218639|
|    CHI|213908|
|    NEW|183121|
|    ORL|178448|
|    BOS|116619|
|    ATL|116060|
|    WAS|115528|
|    HOU|114214|
|    AGA|114174|
|    DAL| 91816|
|    FTL| 84006|
|    SEA| 76597|
|    LVG| 75121|
|    DET| 60681|
|    SAI| 52872|
|    PHI| 41332|
+-------+------+
only showing top 20 rows



In [110]:
df_I94.select("biryear").groupBy("biryear").count().show()

+-------+-----+
|biryear|count|
+-------+-----+
|   1959|44392|
|   1990|61346|
|   1975|64323|
|   1977|61655|
|   1924|  179|
|   2003|41252|
|   2007|34049|
|   1974|66670|
|   2015|14362|
|   1927|  486|
|   1955|35212|
|   2006|35340|
|   1978|60687|
|   1925|  227|
|   1961|49941|
|   2013|14677|
|   1942|11013|
|   1939| 7328|
|   1944|14304|
|   null|   31|
+-------+-----+
only showing top 20 rows



In [112]:
min_by=df_I94.agg({"biryear": "min"}).collect()[0][0]
max_by=df_I94.agg({"biryear": "max"}).collect()[0][0]
print(f'The oldest arrival was born in {min_by} and the youngest in {max_by}')

The oldest arrival was born in 1911 and the youngest in 2016


In [76]:
df_I94.select("airline").groupBy("airline").count().show()

+-------+------+
|airline| count|
+-------+------+
|     DZ|     1|
|    01B|     1|
|    926|     3|
|     CI| 26072|
|      7|     2|
|     TC|     1|
|     FI| 17935|
|     AZ| 25670|
|     IC|     1|
|    78B|     1|
|     UA|380789|
|     EA|  2012|
|     Q7|    20|
|     VP|     1|
|    743|    34|
|    FYG|     4|
|     3M|  1278|
|    YEA|     4|
|     RO|     1|
|     SL|     4|
+-------+------+
only showing top 20 rows



In [77]:
df_I94.select("arrDate").groupBy("arrDate").count().show()

+----------+------+
|   arrDate| count|
+----------+------+
|2016-08-15|130941|
|2016-08-31| 98063|
|2016-08-23|124442|
|2016-08-26|127757|
|2016-08-01|147570|
|2016-08-16|126259|
|2016-08-06|150617|
|2016-08-05|152439|
|2016-08-20|143306|
|2016-08-03|139811|
|2016-08-12|148702|
|2016-08-19|146621|
|2016-08-10|135493|
|2016-08-13|145134|
|2016-08-30| 90944|
|2016-08-07|137923|
|2016-08-27|126097|
|2016-08-18|143132|
|2016-08-04|147395|
|2016-08-21|133151|
+----------+------+
only showing top 20 rows



#### Get max and min arrDates

In [78]:
df_I94.agg({"arrDate": "max"}).collect()[0][0]

datetime.date(2016, 8, 31)

In [79]:
df_I94.agg({"arrDate": "min"}).collect()[0][0]

datetime.date(2016, 8, 1)

#### Get max and min depDates

In [107]:
df_I94.agg({"depDate": "max"}).collect()[0][0]

datetime.date(2016, 11, 22)

In [108]:
df_I94.agg({"depDate": "min"}).collect()[0][0]

datetime.date(2016, 8, 2)

#### Find cases where dep data is before arrival date

In [100]:
df_I94.where(col('arrDate') > col('depDate')).count()

564

#### Drop rows where the arrival data is after the departure date

In [105]:
df_I94=df_I94.where(col('arrDate') <= col('depDate'))

#### Count and delete duplicates

In [106]:
count_before=df_I94.count()
df_I94 = df_I94.drop_duplicates()
count_after=df_I94.count()
print(f'{count_before-count_after} duplicate rows dropped (out of {count_before})')

0 duplicate rows dropped (out of 3451218)


#### What kind of DF is this?

In [92]:
if isinstance(df, pd.DataFrame):
    print('pandas')
else:
    print('spark')

spark


#### Find Nulls

In [104]:
# https://stackoverflow.com/questions/44627386/how-to-find-count-of-null-and-nan-values-for-each-column-in-a-pyspark-dataframe
from pyspark.sql.functions import isnan, when, count, col

df_I94.select([count(when(isnull(c), c)).alias(c) for c in df_I94.columns]).show()


+-----+-----+------+------+------+-------+-------+-------+-------+-------+------+-------+-----+--------+--------+-------+-------+-------+-------+-------+-------+-------+------+-------+-------+------+-----+--------+
|cicid|i94yr|i94mon|i94cit|i94res|i94port|arrdate|i94mode|i94addr|depdate|i94bir|i94visa|count|dtadfile|visapost|  occup|entdepa|entdepd|entdepu|matflag|biryear|dtaddto|gender| insnum|airline|admnum|fltno|visatype|
+-----+-----+------+------+------+-------+-------+-------+-------+-------+------+-------+-----+--------+--------+-------+-------+-------+-------+-------+-------+-------+------+-------+-------+------+-----+--------+
|    0|    0|     0|  6142|     0|      0|      0|   1884| 184462| 651788|   816|      0|    0|       0| 2378159|4061341|     18| 639255|4099524| 637603|    816| 451416|233112|3573888| 145501|     0|23238|       0|
+-----+-----+------+------+------+-------+-------+-------+-------+-------+------+-------+-----+--------+--------+-------+-------+-------+---

#### Deal with Nulls

#### Investigate join to airport_codes

In [7]:
df_airport_codes = spark.read.csv('../../../airport-codes_csv.csv', sep=',', inferSchema=True, header=True)

In [11]:
df_I94.join(df_airport_codes, df_I94.i94port == df_airport_codes.local_code,"left").count()

4761576

In [12]:
df_I94.join(df_airport_codes, df_I94.i94port == df_airport_codes.local_code).count()

3279549

In [13]:
df_I94.join(df_airport_codes, df_I94.i94port == df_airport_codes.local_code,"left").where(isnull("local_code"))

1482027

#### Check the cities where there is no match between local codes and port code
*Hint* It ain't good news.  I don't think the airport_codes datset is going to be much use

In [18]:
df_I94.join(df_airport_codes, df_I94.i94port == df_airport_codes.local_code,"left") \
.where(isnull("local_code")) \
.select("i94port") \
.groupBy("i94port") \
.count() \
.orderBy(desc('count')).show()

+-------+------+
|i94port| count|
+-------+------+
|    NYC|696609|
|    CHI|213908|
|    WAS|115528|
|    AGA|114174|
|    FTL| 84006|
|    LVG| 75121|
|    PHI| 41332|
|    SPM| 19948|
|    YHC| 18036|
|    SAJ| 14273|
|    PBB| 13053|
|    TAM| 12608|
|    SNJ| 10531|
|    POO|  9860|
|    WPB|  5937|
|    XXX|  5109|
|    DER|  3356|
|    THO|  2951|
|    X96|  2507|
|    SYS|  2481|
+-------+------+
only showing top 20 rows



#### Now check What we find in I94_SAS_Labels_Descriptions

In [179]:
text_file = 'I94_SAS_Labels_Descriptions.SAS'

# wholetext=true means we read the file into a singel row - handier for regex
df_label_full = spark.read.text(text_file, wholetext=True)

ERROR:py4j.java_gateway:An error occurred while trying to connect to the Java server (127.0.0.1:41849)
Traceback (most recent call last):
  File "/opt/spark-2.4.3-bin-hadoop2.7/python/lib/py4j-0.10.7-src.zip/py4j/java_gateway.py", line 929, in _get_connection
    connection = self.deque.pop()
IndexError: pop from an empty deque

During handling of the above exception, another exception occurred:

Traceback (most recent call last):
  File "/opt/spark-2.4.3-bin-hadoop2.7/python/lib/py4j-0.10.7-src.zip/py4j/java_gateway.py", line 1067, in start
    self.socket.connect((self.address, self.port))
ConnectionRefusedError: [Errno 111] Connection refused


Py4JNetworkError: An error occurred while trying to connect to the Java server (127.0.0.1:41849)

In [160]:
df_label_full.printSchema()

root
 |-- value: string (nullable = true)



In [161]:
pattern='(\$i94prtl)([^;]+)'

In [164]:

df_new = df_label_full.withColumn('I94PORT', regexp_extract(col('value'),pattern,2))


In [173]:
df_new = df_new.withColumn('port',explode(split('I94PORT','[\r\n]+'))).drop('value').drop('I94PORT')
# df_new.toPandas().head()

In [174]:
df_I94_code = df_new.withColumn('code',regexp_extract(col('port'),"(?<=')[0-9A-Z. ]+(?=')",0)) \
    .withColumn('city_state',regexp_extract(col('port'),"(=\t')([0-9A-Za-z ,\-()\/\.#&]+)(')",2)) \
    .withColumn('city', split(col('city_state'),',').getItem(0)) \
    .withColumn('state', split(col('city_state'),',').getItem(1)) \
    .withColumn('state', regexp_replace(col('state'), ' *$', '')) \
    .where(col('port')!='') \
    .drop('port') \

In [178]:
df_I94_code.show()

ERROR:py4j.java_gateway:An error occurred while trying to connect to the Java server (127.0.0.1:41849)
Traceback (most recent call last):
  File "/opt/spark-2.4.3-bin-hadoop2.7/python/lib/py4j-0.10.7-src.zip/py4j/java_gateway.py", line 929, in _get_connection
    connection = self.deque.pop()
IndexError: pop from an empty deque

During handling of the above exception, another exception occurred:

Traceback (most recent call last):
  File "/opt/spark-2.4.3-bin-hadoop2.7/python/lib/py4j-0.10.7-src.zip/py4j/java_gateway.py", line 1067, in start
    self.socket.connect((self.address, self.port))
ConnectionRefusedError: [Errno 111] Connection refused


Py4JNetworkError: An error occurred while trying to connect to the Java server (127.0.0.1:41849)

In [None]:
df_I94.join(df_I94_code, df_I94.i94port == df_I94_code.code,"left") \
.where(isnull("code")) \
.select("i94port") \
.groupBy("i94port") \
.count() \
.orderBy(desc('count')).show()

Py4JJavaError: An error occurred while calling o1573.showString.
: org.apache.spark.SparkException: Exception thrown in awaitResult: 
	at org.apache.spark.util.ThreadUtils$.awaitResult(ThreadUtils.scala:226)
	at org.apache.spark.sql.execution.exchange.BroadcastExchangeExec.doExecuteBroadcast(BroadcastExchangeExec.scala:146)
	at org.apache.spark.sql.execution.InputAdapter.doExecuteBroadcast(WholeStageCodegenExec.scala:387)
	at org.apache.spark.sql.execution.SparkPlan$$anonfun$executeBroadcast$1.apply(SparkPlan.scala:144)
	at org.apache.spark.sql.execution.SparkPlan$$anonfun$executeBroadcast$1.apply(SparkPlan.scala:140)
	at org.apache.spark.sql.execution.SparkPlan$$anonfun$executeQuery$1.apply(SparkPlan.scala:155)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
	at org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:152)
	at org.apache.spark.sql.execution.SparkPlan.executeBroadcast(SparkPlan.scala:140)
	at org.apache.spark.sql.execution.joins.BroadcastHashJoinExec.prepareBroadcast(BroadcastHashJoinExec.scala:117)
	at org.apache.spark.sql.execution.joins.BroadcastHashJoinExec.codegenOuter(BroadcastHashJoinExec.scala:259)
	at org.apache.spark.sql.execution.joins.BroadcastHashJoinExec.doConsume(BroadcastHashJoinExec.scala:102)
	at org.apache.spark.sql.execution.CodegenSupport$class.constructDoConsumeFunction(WholeStageCodegenExec.scala:216)
	at org.apache.spark.sql.execution.CodegenSupport$class.consume(WholeStageCodegenExec.scala:187)
	at org.apache.spark.sql.execution.InputAdapter.consume(WholeStageCodegenExec.scala:374)
	at org.apache.spark.sql.execution.InputAdapter.doProduce(WholeStageCodegenExec.scala:403)
	at org.apache.spark.sql.execution.CodegenSupport$$anonfun$produce$1.apply(WholeStageCodegenExec.scala:90)
	at org.apache.spark.sql.execution.CodegenSupport$$anonfun$produce$1.apply(WholeStageCodegenExec.scala:85)
	at org.apache.spark.sql.execution.SparkPlan$$anonfun$executeQuery$1.apply(SparkPlan.scala:155)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
	at org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:152)
	at org.apache.spark.sql.execution.CodegenSupport$class.produce(WholeStageCodegenExec.scala:85)
	at org.apache.spark.sql.execution.InputAdapter.produce(WholeStageCodegenExec.scala:374)
	at org.apache.spark.sql.execution.joins.BroadcastHashJoinExec.doProduce(BroadcastHashJoinExec.scala:96)
	at org.apache.spark.sql.execution.CodegenSupport$$anonfun$produce$1.apply(WholeStageCodegenExec.scala:90)
	at org.apache.spark.sql.execution.CodegenSupport$$anonfun$produce$1.apply(WholeStageCodegenExec.scala:85)
	at org.apache.spark.sql.execution.SparkPlan$$anonfun$executeQuery$1.apply(SparkPlan.scala:155)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
	at org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:152)
	at org.apache.spark.sql.execution.CodegenSupport$class.produce(WholeStageCodegenExec.scala:85)
	at org.apache.spark.sql.execution.joins.BroadcastHashJoinExec.produce(BroadcastHashJoinExec.scala:40)
	at org.apache.spark.sql.execution.FilterExec.doProduce(basicPhysicalOperators.scala:125)
	at org.apache.spark.sql.execution.CodegenSupport$$anonfun$produce$1.apply(WholeStageCodegenExec.scala:90)
	at org.apache.spark.sql.execution.CodegenSupport$$anonfun$produce$1.apply(WholeStageCodegenExec.scala:85)
	at org.apache.spark.sql.execution.SparkPlan$$anonfun$executeQuery$1.apply(SparkPlan.scala:155)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
	at org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:152)
	at org.apache.spark.sql.execution.CodegenSupport$class.produce(WholeStageCodegenExec.scala:85)
	at org.apache.spark.sql.execution.FilterExec.produce(basicPhysicalOperators.scala:85)
	at org.apache.spark.sql.execution.ProjectExec.doProduce(basicPhysicalOperators.scala:45)
	at org.apache.spark.sql.execution.CodegenSupport$$anonfun$produce$1.apply(WholeStageCodegenExec.scala:90)
	at org.apache.spark.sql.execution.CodegenSupport$$anonfun$produce$1.apply(WholeStageCodegenExec.scala:85)
	at org.apache.spark.sql.execution.SparkPlan$$anonfun$executeQuery$1.apply(SparkPlan.scala:155)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
	at org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:152)
	at org.apache.spark.sql.execution.CodegenSupport$class.produce(WholeStageCodegenExec.scala:85)
	at org.apache.spark.sql.execution.ProjectExec.produce(basicPhysicalOperators.scala:35)
	at org.apache.spark.sql.execution.aggregate.HashAggregateExec.doProduceWithKeys(HashAggregateExec.scala:654)
	at org.apache.spark.sql.execution.aggregate.HashAggregateExec.doProduce(HashAggregateExec.scala:166)
	at org.apache.spark.sql.execution.CodegenSupport$$anonfun$produce$1.apply(WholeStageCodegenExec.scala:90)
	at org.apache.spark.sql.execution.CodegenSupport$$anonfun$produce$1.apply(WholeStageCodegenExec.scala:85)
	at org.apache.spark.sql.execution.SparkPlan$$anonfun$executeQuery$1.apply(SparkPlan.scala:155)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
	at org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:152)
	at org.apache.spark.sql.execution.CodegenSupport$class.produce(WholeStageCodegenExec.scala:85)
	at org.apache.spark.sql.execution.aggregate.HashAggregateExec.produce(HashAggregateExec.scala:40)
	at org.apache.spark.sql.execution.WholeStageCodegenExec.doCodeGen(WholeStageCodegenExec.scala:544)
	at org.apache.spark.sql.execution.WholeStageCodegenExec.doExecute(WholeStageCodegenExec.scala:598)
	at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:131)
	at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:127)
	at org.apache.spark.sql.execution.SparkPlan$$anonfun$executeQuery$1.apply(SparkPlan.scala:155)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
	at org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:152)
	at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:127)
	at org.apache.spark.sql.execution.exchange.ShuffleExchangeExec.prepareShuffleDependency(ShuffleExchangeExec.scala:92)
	at org.apache.spark.sql.execution.exchange.ShuffleExchangeExec$$anonfun$doExecute$1.apply(ShuffleExchangeExec.scala:128)
	at org.apache.spark.sql.execution.exchange.ShuffleExchangeExec$$anonfun$doExecute$1.apply(ShuffleExchangeExec.scala:119)
	at org.apache.spark.sql.catalyst.errors.package$.attachTree(package.scala:52)
	at org.apache.spark.sql.execution.exchange.ShuffleExchangeExec.doExecute(ShuffleExchangeExec.scala:119)
	at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:131)
	at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:127)
	at org.apache.spark.sql.execution.SparkPlan$$anonfun$executeQuery$1.apply(SparkPlan.scala:155)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
	at org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:152)
	at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:127)
	at org.apache.spark.sql.execution.InputAdapter.inputRDDs(WholeStageCodegenExec.scala:391)
	at org.apache.spark.sql.execution.aggregate.HashAggregateExec.inputRDDs(HashAggregateExec.scala:151)
	at org.apache.spark.sql.execution.WholeStageCodegenExec.doExecute(WholeStageCodegenExec.scala:627)
	at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:131)
	at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:127)
	at org.apache.spark.sql.execution.SparkPlan$$anonfun$executeQuery$1.apply(SparkPlan.scala:155)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
	at org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:152)
	at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:127)
	at org.apache.spark.sql.execution.TakeOrderedAndProjectExec.executeCollect(limit.scala:136)
	at org.apache.spark.sql.Dataset.org$apache$spark$sql$Dataset$$collectFromPlan(Dataset.scala:3383)
	at org.apache.spark.sql.Dataset$$anonfun$head$1.apply(Dataset.scala:2544)
	at org.apache.spark.sql.Dataset$$anonfun$head$1.apply(Dataset.scala:2544)
	at org.apache.spark.sql.Dataset$$anonfun$53.apply(Dataset.scala:3364)
	at org.apache.spark.sql.execution.SQLExecution$$anonfun$withNewExecutionId$1.apply(SQLExecution.scala:78)
	at org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:125)
	at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:73)
	at org.apache.spark.sql.Dataset.withAction(Dataset.scala:3363)
	at org.apache.spark.sql.Dataset.head(Dataset.scala:2544)
	at org.apache.spark.sql.Dataset.take(Dataset.scala:2758)
	at org.apache.spark.sql.Dataset.getRows(Dataset.scala:254)
	at org.apache.spark.sql.Dataset.showString(Dataset.scala:291)
	at sun.reflect.GeneratedMethodAccessor76.invoke(Unknown Source)
	at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.lang.reflect.Method.invoke(Method.java:498)
	at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
	at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
	at py4j.Gateway.invoke(Gateway.java:282)
	at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
	at py4j.commands.CallCommand.execute(CallCommand.java:79)
	at py4j.GatewayConnection.run(GatewayConnection.java:238)
	at java.lang.Thread.run(Thread.java:748)
Caused by: org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 105.0 failed 1 times, most recent failure: Lost task 0.0 in stage 105.0 (TID 1706, localhost, executor driver): java.lang.OutOfMemoryError: Java heap space
	at java.util.Arrays.copyOf(Arrays.java:3236)
	at java.io.ByteArrayOutputStream.grow(ByteArrayOutputStream.java:118)
	at java.io.ByteArrayOutputStream.ensureCapacity(ByteArrayOutputStream.java:93)
	at java.io.ByteArrayOutputStream.write(ByteArrayOutputStream.java:153)
	at net.jpountz.lz4.LZ4BlockOutputStream.flushBufferedData(LZ4BlockOutputStream.java:220)
	at net.jpountz.lz4.LZ4BlockOutputStream.write(LZ4BlockOutputStream.java:159)
	at java.io.DataOutputStream.writeInt(DataOutputStream.java:197)
	at org.apache.spark.sql.execution.SparkPlan$$anonfun$2.apply(SparkPlan.scala:257)
	at org.apache.spark.sql.execution.SparkPlan$$anonfun$2.apply(SparkPlan.scala:247)
	at org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$24.apply(RDD.scala:836)
	at org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$24.apply(RDD.scala:836)
	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:324)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:288)
	at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
	at org.apache.spark.scheduler.Task.run(Task.scala:121)
	at org.apache.spark.executor.Executor$TaskRunner$$anonfun$10.apply(Executor.scala:408)
	at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1360)
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:414)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
	at java.lang.Thread.run(Thread.java:748)

Driver stacktrace:
	at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1889)
	at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1877)
	at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1876)
	at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
	at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48)
	at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1876)
	at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:926)
	at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:926)
	at scala.Option.foreach(Option.scala:257)
	at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:926)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:2110)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2059)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2048)
	at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:49)
	at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:737)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2061)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2082)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2101)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2126)
	at org.apache.spark.rdd.RDD$$anonfun$collect$1.apply(RDD.scala:945)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
	at org.apache.spark.rdd.RDD.withScope(RDD.scala:363)
	at org.apache.spark.rdd.RDD.collect(RDD.scala:944)
	at org.apache.spark.sql.execution.SparkPlan.executeCollectIterator(SparkPlan.scala:306)
	at org.apache.spark.sql.execution.exchange.BroadcastExchangeExec$$anonfun$relationFuture$1$$anonfun$apply$1.apply(BroadcastExchangeExec.scala:79)
	at org.apache.spark.sql.execution.exchange.BroadcastExchangeExec$$anonfun$relationFuture$1$$anonfun$apply$1.apply(BroadcastExchangeExec.scala:76)
	at org.apache.spark.sql.execution.SQLExecution$$anonfun$withExecutionId$1.apply(SQLExecution.scala:101)
	at org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:125)
	at org.apache.spark.sql.execution.SQLExecution$.withExecutionId(SQLExecution.scala:98)
	at org.apache.spark.sql.execution.exchange.BroadcastExchangeExec$$anonfun$relationFuture$1.apply(BroadcastExchangeExec.scala:75)
	at org.apache.spark.sql.execution.exchange.BroadcastExchangeExec$$anonfun$relationFuture$1.apply(BroadcastExchangeExec.scala:75)
	at scala.concurrent.impl.Future$PromiseCompletingRunnable.liftedTree1$1(Future.scala:24)
	at scala.concurrent.impl.Future$PromiseCompletingRunnable.run(Future.scala:24)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
	... 1 more
Caused by: java.lang.OutOfMemoryError: Java heap space
	at java.util.Arrays.copyOf(Arrays.java:3236)
	at java.io.ByteArrayOutputStream.grow(ByteArrayOutputStream.java:118)
	at java.io.ByteArrayOutputStream.ensureCapacity(ByteArrayOutputStream.java:93)
	at java.io.ByteArrayOutputStream.write(ByteArrayOutputStream.java:153)
	at net.jpountz.lz4.LZ4BlockOutputStream.flushBufferedData(LZ4BlockOutputStream.java:220)
	at net.jpountz.lz4.LZ4BlockOutputStream.write(LZ4BlockOutputStream.java:159)
	at java.io.DataOutputStream.writeInt(DataOutputStream.java:197)
	at org.apache.spark.sql.execution.SparkPlan$$anonfun$2.apply(SparkPlan.scala:257)
	at org.apache.spark.sql.execution.SparkPlan$$anonfun$2.apply(SparkPlan.scala:247)
	at org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$24.apply(RDD.scala:836)
	at org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$24.apply(RDD.scala:836)
	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:324)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:288)
	at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
	at org.apache.spark.scheduler.Task.run(Task.scala:121)
	at org.apache.spark.executor.Executor$TaskRunner$$anonfun$10.apply(Executor.scala:408)
	at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1360)
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:414)
	... 3 more


----------------------------------------
Exception happened during processing of request from ('127.0.0.1', 36808)
ERROR:root:Exception while sending command.
Traceback (most recent call last):
  File "/opt/spark-2.4.3-bin-hadoop2.7/python/lib/py4j-0.10.7-src.zip/py4j/java_gateway.py", line 1159, in send_command
    raise Py4JNetworkError("Answer from Java side is empty")
py4j.protocol.Py4JNetworkError: Answer from Java side is empty

During handling of the above exception, another exception occurred:

Traceback (most recent call last):
  File "/opt/spark-2.4.3-bin-hadoop2.7/python/lib/py4j-0.10.7-src.zip/py4j/java_gateway.py", line 985, in send_command
    response = connection.send_command(command)
  File "/opt/spark-2.4.3-bin-hadoop2.7/python/lib/py4j-0.10.7-src.zip/py4j/java_gateway.py", line 1164, in send_command
    "Error while receiving", e, proto.ERROR_ON_RECEIVE)
py4j.protocol.Py4JNetworkError: Error while receiving
Traceback (most recent call last):
  File "/opt/conda/lib/py

#### Add a new row for the missing airport

In [177]:
columns = ['code', 'city_state', 'city','state']
vals = [('OCA','Ocean Reef Club, FL','Ocean Reef Club', 'FL' )]

df_OCA = spark.createDataFrame(vals, columns)

df_I94_code = df_I94_code.union(df_OCA)

ERROR:root:Exception while sending command.
Traceback (most recent call last):
  File "/opt/spark-2.4.3-bin-hadoop2.7/python/lib/py4j-0.10.7-src.zip/py4j/java_gateway.py", line 1159, in send_command
    raise Py4JNetworkError("Answer from Java side is empty")
py4j.protocol.Py4JNetworkError: Answer from Java side is empty

During handling of the above exception, another exception occurred:

Traceback (most recent call last):
  File "/opt/spark-2.4.3-bin-hadoop2.7/python/lib/py4j-0.10.7-src.zip/py4j/java_gateway.py", line 985, in send_command
    response = connection.send_command(command)
  File "/opt/spark-2.4.3-bin-hadoop2.7/python/lib/py4j-0.10.7-src.zip/py4j/java_gateway.py", line 1164, in send_command
    "Error while receiving", e, proto.ERROR_ON_RECEIVE)
py4j.protocol.Py4JNetworkError: Error while receiving


Py4JError: An error occurred while calling o20.sc

In [None]:
df_I94.join(df_I94_code, df_I94.i94port == df_I94_code.code,"left") \
.where(isnull("code")) \
.select("i94port") \
.groupBy("i94port") \
.count() \
.orderBy(desc('count')).show()

> /* I94VISA - Visa codes collapsed into three categories:
   1 = Business
   2 = Pleasure
   3 = Student
*/

In [144]:
columns = ['I94VISA', 'category']
vals = [(1,'Business'),(2,'Pleasure'),(3,'Student')]

df_I94VISA = spark.createDataFrame(vals, columns)

In [145]:
df_I94VISA.show()

+-------+--------+
|I94VISA|category|
+-------+--------+
|      1|Business|
|      2|Pleasure|
|      3| Student|
+-------+--------+



In [153]:
pattern='(i94cntyl)([^;]+)'

In [156]:

df_new = df_label_full.withColumn('I94RES', regexp_extract(col('value'),pattern,2))


In [158]:
# df_new.toPandas().head()

In [105]:
# df_new = df_new.withColumn('port',explode(split('I94PORT','[\r\n]+'))).drop('value').drop('I94PORT')
df_new = df_new.withColumn('country',explode(split('I94RES','[\r\n]+'))).drop('value').drop('I94RES')
df_new.toPandas().head()

Unnamed: 0,port
0,
1,"'ALC'\t=\t'ALCAN, AK '"
2,"'ANC'\t=\t'ANCHORAGE, AK '"
3,"'BAR'\t=\t'BAKER AAF - BAKER ISLAND, AK'"
4,"'DAC'\t=\t'DALTONS CACHE, AK '"


In [133]:
df_I94RES = df_new.withColumn('code',regexp_extract(col('port'),"(?<=')[0-9A-Z. ]+(?=')",0)) \
    .withColumn('city_state',regexp_extract(col('port'),"(=\t')([0-9A-Za-z ,\-()\/\.#&]+)(')",2)) \
    .withColumn('city', split(col('city_state'),',').getItem(0)) \
    .withColumn('state', split(col('city_state'),',').getItem(1)) \
    .withColumn('state', regexp_replace(col('state'), ' *$', '')) \
    .where(col('port')!='') \
    .drop('port') \

In [134]:
df_I94_code.show()

+----+--------------------+--------------------+-----+
|code|          city_state|                city|state|
+----+--------------------+--------------------+-----+
| ALC|ALCAN, AK        ...|               ALCAN|   AK|
| ANC|ANCHORAGE, AK    ...|           ANCHORAGE|   AK|
| BAR|BAKER AAF - BAKER...|BAKER AAF - BAKER...|   AK|
| DAC|DALTONS CACHE, AK...|       DALTONS CACHE|   AK|
| PIZ|DEW STATION PT LA...|DEW STATION PT LA...|   AK|
| DTH|DUTCH HARBOR, AK ...|        DUTCH HARBOR|   AK|
| EGL|EAGLE, AK        ...|               EAGLE|   AK|
| FRB|FAIRBANKS, AK    ...|           FAIRBANKS|   AK|
| HOM|HOMER, AK        ...|               HOMER|   AK|
| HYD|HYDER, AK        ...|               HYDER|   AK|
| JUN|JUNEAU, AK       ...|              JUNEAU|   AK|
| 5KE|       KETCHIKAN, AK|           KETCHIKAN|   AK|
| KET|KETCHIKAN, AK    ...|           KETCHIKAN|   AK|
| MOS|MOSES POINT INTER...|MOSES POINT INTER...|   AK|
| NIK|NIKISKI, AK      ...|             NIKISKI|   AK|
| NOM|NOM,

In [135]:
df_I94.join(df_I94_code, df_I94.i94port == df_I94_code.code,"left") \
.where(isnull("code")) \
.select("i94port") \
.groupBy("i94port") \
.count() \
.orderBy(desc('count')).show()

+-------+-----+
|i94port|count|
+-------+-----+
|    OCA|    1|
+-------+-----+

