In [32]:
# (1) Download databricks.spark.xml library 
# (2) https://repo1.maven.org/maven2/com/databricks/spark-xml_2.12/0.12.0/spark-xml_2.12-0.12.0.jar
# (3) Upload jar file to HDFS root
from pyspark.sql import SparkSession
from pyspark.sql.functions import *
warehouse_location = 'hdfs://namenode:8020/warehouse'
spark = SparkSession \
    .builder \
    .appName("Python Spark SQL Hive integration example") \
    .config("spark.sql.warehouse.dir", warehouse_location) \
    .config("hive.metastore.uris", "thrift://hive-metastore:9083") \
    .enableHiveSupport() \
    .getOrCreate()

In [33]:
spark.version

'3.1.1'

In [34]:
sc = spark.sparkContext

In [35]:
hdfs_path = "hdfs://namenode:8020//raw/sensors/6a2ab034-8bc9-42b1-90ae-1315f6ea6f7a"
hdfs_path2 = "hdfs://namenode:8020//raw/fails/00ffbf9a-e13b-43b8-82b2-fe3f71bd5acd"

In [36]:
multiline_df = spark.read.option("multiline","true") \
      .json((hdfs_path)).select("results")
multiline_df.show()  

+--------------------+
|             results|
+--------------------+
|[{2021-11-22T15:4...|
+--------------------+



In [37]:
multiline_df2 = spark.read.option("multiline","true") \
      .json((hdfs_path2)).select("results")
multiline_df2.show()  

+--------------------+
|             results|
+--------------------+
|[{2021-11-22T14:2...|
+--------------------+



In [38]:
from pyspark.sql.types import *

In [39]:
df2 = multiline_df.withColumn("results", explode("results")) \
    .withColumn("sensor_created_at", col("results")["sensor_created_at"].cast("timestamp")) \
    .withColumn("sensor_event_type", col("results")["sensor_event_type"].cast("string")) \
    .withColumn("sensor_id_cycle", col("results")["sensor_id_cycle"].cast("integer")) \
    .withColumn("sensor_ip", col("results")["sensor_ip"].cast("string")) \
    .withColumn("sensor_unique_id", col("results")["sensor_unique_id"].cast("string")) \
    .withColumn("sensor_value_humidity", col("results")["sensor_value_humidity"].cast("integer")) \
    .withColumn("sensor_value_temperature_motor1", col("results")["sensor_value_temperature_motor1"].cast("integer")) \
    .withColumn("sensor_value_temperature_motor2", col("results")["sensor_value_temperature_motor2"].cast("integer")) \
    .withColumn("sensor_value_temperature_motor3", col("results")["sensor_value_temperature_motor3"].cast("integer")) \
    .withColumn("sensor_value_vibrationhz_x", col("results")["sensor_value_vibrationhz_x"].cast("integer")) \
    .withColumn("sensor_value_vibrationhz_y", col("results")["sensor_value_vibrationhz_y"].cast("integer")) \
    .withColumn("sensor_value_vibrationhz_z", col("results")["sensor_value_vibrationhz_z"].cast("integer")) \
    .withColumn("value_noise_dba_motor1", col("results")["value_noise_dba_motor1"].cast("integer")) \
    .withColumn("value_noise_dba_motor2", col("results")["value_noise_dba_motor2"].cast("integer")) \
    .withColumn("value_noise_dba_motor3", col("results")["value_noise_dba_motor3"].cast("integer")) \
    .drop("results") \
    .orderBy("sensor_created_at")

In [40]:
df3 = multiline_df2.withColumn("results", explode("results")) \
    .withColumn("falha_created_at", col("results")["falha_created_at"].cast("timestamp")) \
    .withColumn("falha_id_falha", col("results")["falha_id_falha"].cast("string")) \
    .withColumn("falha_ip", col("results")["falha_ip"].cast("string")) \
    .withColumn("falha_hostname", col("results")["falha_hostname"].cast("string")) \
    .withColumn("falha_event_type", col("results")["falha_event_type"].cast("string")) \
    .withColumn("falha_tipo_falha", col("results")["falha_tipo_falha"].cast("string")) \
    .withColumn("falha_error_code", col("results")["falha_error_code"].cast("string")) \
    .withColumn("falha_error_description", col("results")["falha_error_description"].cast("string")) \
    .drop("results") \
    .orderBy("falha_created_at")

In [41]:
_df_sensor = df2.toPandas()
_df_falhas = df3.toPandas()

In [42]:
_df_sensor

Unnamed: 0,sensor_created_at,sensor_event_type,sensor_id_cycle,sensor_ip,sensor_unique_id,sensor_value_humidity,sensor_value_temperature_motor1,sensor_value_temperature_motor2,sensor_value_temperature_motor3,sensor_value_vibrationhz_x,sensor_value_vibrationhz_y,sensor_value_vibrationhz_z,value_noise_dba_motor1,value_noise_dba_motor2,value_noise_dba_motor3
0,2021-11-22 15:44:30,sensor,3,172.31.0.11,e672bf0c-1149-4e76-8ad3-8859ee38408d,30,13,31,9,13,20,17,43,26,30
1,2021-11-22 15:44:35,sensor,3,172.31.0.11,9d50cc24-d0c7-434a-ab9c-88de65f855de,75,8,27,30,7,12,21,20,60,35
2,2021-11-22 15:44:40,sensor,3,172.31.0.11,6d4da84f-43f9-4f58-b1a4-9546e6e284f0,24,8,11,19,9,21,21,59,63,52
3,2021-11-22 15:44:45,sensor,1,172.31.0.11,5d5162d8-9bd9-47f6-8176-366bf3c4289f,31,19,16,9,8,16,11,36,42,35
4,2021-11-22 15:44:50,sensor,2,172.31.0.11,82193bb1-51c6-42c7-aa98-22534aff58f5,16,16,29,14,11,14,10,49,67,50
5,2021-11-22 15:44:55,sensor,3,172.31.0.11,d5a1af26-9f2d-4398-b43a-bd859bebc9fc,58,10,11,24,9,10,18,27,50,60
6,2021-11-22 15:45:00,sensor,3,172.31.0.11,19d3cfad-9d6b-4c31-aa9a-4ab6e543bf85,54,3,10,9,5,16,19,29,25,56
7,2021-11-22 15:45:05,sensor,1,172.31.0.11,085988af-e76c-43a9-950d-156f4b2b0dc4,29,24,32,32,2,12,13,49,50,41
8,2021-11-22 15:45:10,sensor,2,172.31.0.11,61d44e1b-ca56-4dbf-8589-33db298f1ce8,57,5,22,34,13,21,20,62,41,59
9,2021-11-22 15:45:15,sensor,2,172.31.0.11,38c647b2-d9a4-4f04-9fa5-52526270ad18,19,16,5,34,3,18,21,38,22,28


In [43]:
_df_sensor[_df_sensor["sensor_created_at"] == "2021-11-22 14:27:40"]

Unnamed: 0,sensor_created_at,sensor_event_type,sensor_id_cycle,sensor_ip,sensor_unique_id,sensor_value_humidity,sensor_value_temperature_motor1,sensor_value_temperature_motor2,sensor_value_temperature_motor3,sensor_value_vibrationhz_x,sensor_value_vibrationhz_y,sensor_value_vibrationhz_z,value_noise_dba_motor1,value_noise_dba_motor2,value_noise_dba_motor3


In [44]:
pip install plotly

Note: you may need to restart the kernel to use updated packages.


In [49]:
import plotly.express as px

fig = px.line(_df_sensor, x='sensor_created_at', y="sensor_value_vibrationhz_z")
fig.show()

In [50]:
import plotly.express as px

fig = px.line(_df_sensor, x='sensor_created_at', y="sensor_value_vibrationhz_x")
fig.show()

In [137]:
_df_falhas

Unnamed: 0,falha_created_at,falha_id_falha,falha_ip,falha_hostname,falha_event_type,falha_tipo_falha,falha_error_code,falha_error_description
0,2021-11-22 14:27:35,5f91519c-d079-45d7-9c6e-5174f07ad23c,172.31.0.11,eafd4bd56e36,fail,falha_2,erro_500,Falha Geral
1,2021-11-22 14:27:40,ebdebe44-a623-48b5-bc0d-1a31e0443e2c,172.31.0.11,eafd4bd56e36,fail,falha_3,erro_500,Falha Crítica
2,2021-11-22 14:27:30,ebf4931e-828d-4dad-b8ea-29cb956cb748,172.31.0.11,eafd4bd56e36,fail,falha_1,erro_500,Pequena falha


In [133]:
df2.printSchema()

root
 |-- sensor_created_at: timestamp (nullable = true)
 |-- sensor_event_type: string (nullable = true)
 |-- sensor_id_cycle: integer (nullable = true)
 |-- sensor_ip: string (nullable = true)
 |-- sensor_unique_id: string (nullable = true)
 |-- sensor_value_humidity: integer (nullable = true)
 |-- sensor_value_temperature_motor1: integer (nullable = true)
 |-- sensor_value_temperature_motor2: integer (nullable = true)
 |-- sensor_value_temperature_motor3: integer (nullable = true)
 |-- sensor_value_vibrationhz_x: integer (nullable = true)
 |-- sensor_value_vibrationhz_y: integer (nullable = true)
 |-- sensor_value_vibrationhz_z: integer (nullable = true)
 |-- value_noise_dba_motor1: integer (nullable = true)
 |-- value_noise_dba_motor2: integer (nullable = true)
 |-- value_noise_dba_motor3: integer (nullable = true)



In [134]:
df3.printSchema()

root
 |-- falha_created_at: timestamp (nullable = true)
 |-- falha_id_falha: string (nullable = true)
 |-- falha_ip: string (nullable = true)
 |-- falha_hostname: string (nullable = true)
 |-- falha_event_type: string (nullable = true)
 |-- falha_tipo_falha: string (nullable = true)
 |-- falha_error_code: string (nullable = true)
 |-- falha_error_description: string (nullable = true)



In [135]:
_df_sensor.dtypes

sensor_created_at                  datetime64[ns]
sensor_event_type                          object
sensor_id_cycle                             int32
sensor_ip                                  object
sensor_unique_id                           object
sensor_value_humidity                       int32
sensor_value_temperature_motor1             int32
sensor_value_temperature_motor2             int32
sensor_value_temperature_motor3             int32
sensor_value_vibrationhz_x                  int32
sensor_value_vibrationhz_y                  int32
sensor_value_vibrationhz_z                  int32
value_noise_dba_motor1                      int32
value_noise_dba_motor2                      int32
value_noise_dba_motor3                      int32
dtype: object

In [136]:
_df_falhas.dtypes

falha_created_at           datetime64[ns]
falha_id_falha                     object
falha_ip                           object
falha_hostname                     object
falha_event_type                   object
falha_tipo_falha                   object
falha_error_code                   object
falha_error_description            object
dtype: object

In [113]:
df3 = multiline_df2.withColumn("results", explode("results")) \
    .withColumn("falha_created_at", col("results")["falha_created_at"].cast("timestamp")) \
    .withColumn("falha_id_falha", col("results")["falha_id_falha"].cast("string")) \
    .withColumn("falha_ip", col("results")["falha_ip"].cast("string")) \
    .withColumn("falha_hostname", col("results")["falha_hostname"].cast("string")) \
    .withColumn("falha_event_type", col("results")["falha_event_type"].cast("string")) \
    .withColumn("falha_tipo_falha", col("results")["falha_tipo_falha"].cast("string")) \
    .withColumn("falha_error_code", col("results")["falha_error_code"].cast("string")) \
    .withColumn("falha_error_description", col("results")["falha_error_description"].cast("string")) \
    .drop("results")

In [114]:
from pyspark.sql.types import StructType
from pyspark.sql.types import StructField
from pyspark.sql.types import *

In [48]:
# Define custom schema
schema = StructType([
      StructField("sensor_unique_id",StringType(),True),
      StructField("sensor_ip",StringType(),True),
      StructField("sensor_hostname",StringType(),True),
      StructField("sensor_id_cycle",IntegerType(),True),
      StructField("sensor_event_type",StringType(),True),
      StructField("sensor_value_temperature_motor1",IntegerType(),True),
      StructField("sensor_value_temperature_motor2",IntegerType(),True),
      StructField("sensor_value_temperature_motor3",IntegerType(),True),
      StructField("sensor_value_vibrationHZ_X",IntegerType(),True),
      StructField("sensor_value_vibrationHZ_Y",IntegerType(),True),
      StructField("sensor_value_vibrationHZ_Z",IntegerType(),True),
      StructField("sensor_value_humidity",IntegerType(),True),
      StructField("value_noise_dbA_motor1",IntegerType(),True),
      StructField("value_noise_dbA_motor2",IntegerType(),True),
      StructField("value_noise_dbA_motor3",IntegerType(),True),
      StructField("sensor_created_at",StringType(),True)
])

df_with_schema = spark.read.schema(schema) \
        .json(("hdfs://namenode:8020//raw/sensors/5f2e9510-ce93-4e0a-b1e3-84bbfca312d4"))
df_with_schema.printSchema()
df_with_schema.show()

root
 |-- sensor_unique_id: string (nullable = true)
 |-- sensor_ip: string (nullable = true)
 |-- sensor_hostname: string (nullable = true)
 |-- sensor_id_cycle: integer (nullable = true)
 |-- sensor_event_type: string (nullable = true)
 |-- sensor_value_temperature_motor1: integer (nullable = true)
 |-- sensor_value_temperature_motor2: integer (nullable = true)
 |-- sensor_value_temperature_motor3: integer (nullable = true)
 |-- sensor_value_vibrationHZ_X: integer (nullable = true)
 |-- sensor_value_vibrationHZ_Y: integer (nullable = true)
 |-- sensor_value_vibrationHZ_Z: integer (nullable = true)
 |-- sensor_value_humidity: integer (nullable = true)
 |-- value_noise_dbA_motor1: integer (nullable = true)
 |-- value_noise_dbA_motor2: integer (nullable = true)
 |-- value_noise_dbA_motor3: integer (nullable = true)
 |-- sensor_created_at: string (nullable = true)

+----------------+---------+---------------+---------------+-----------------+-------------------------------+--------------

In [62]:
import pyspark.sql.functions as F
df_with_schema.select(F.col("results.element.sensor_created_at").alias("sensor_created_at"),
                    F.col("results.element.sensor_event_type").alias("sensor_event_type"),
                    F.col("results.results.element.sensor_hostname").alias("sensor_hostname"),
                    F.col("results.element.sensor_id_cycle").alias("sensor_id_cycle"),
                    F.col("results.element.sensor_ip").alias("sensor_ip"), 
                    F.col("results.element.sensor_unique_id").alias("sensor_unique_id"), 
                    F.col("results.element.sensor_value_humidity").alias("sensor_value_humidity"), 
                    F.col("results.element.sensor_value_temperature_motor1").alias("sensor_value_temperature_motor1"),
                    F.col("results.element.sensor_value_temperature_motor2").alias("sensor_value_temperature_motor2"), 
                    F.col("results.element.sensor_value_temperature_motor3").alias("sensor_value_temperature_motor3"),
                    F.col("results.element.sensor_value_vibrationhz_x").alias("sensor_value_vibrationhz_x"),
                    F.col("results.element.sensor_value_vibrationhz_y").alias("sensor_value_vibrationhz_y"), 
                    F.col("results.element.sensor_value_vibrationhz_z").alias("sensor_value_vibrationhz_z"), 
                    F.col("results.element.value_noise_dba_motor1").alias("value_noise_dba_motor1"),
                    F.col("results.element.value_noise_dba_motor2").alias("value_noise_dba_motor2"), 
                    F.col("results.element.value_noise_dba_motor3").alias("value_noise_dba_motor3"))

AnalysisException: cannot resolve '`results.element.sensor_created_at`' given input columns: [sensor_created_at, sensor_event_type, sensor_hostname, sensor_id_cycle, sensor_ip, sensor_unique_id, sensor_value_humidity, sensor_value_temperature_motor1, sensor_value_temperature_motor2, sensor_value_temperature_motor3, sensor_value_vibrationHZ_X, sensor_value_vibrationHZ_Y, sensor_value_vibrationHZ_Z, value_noise_dbA_motor1, value_noise_dbA_motor2, value_noise_dbA_motor3];
'Project ['results.element.sensor_created_at AS sensor_created_at#925, 'results.element.sensor_event_type AS sensor_event_type#926, 'results.results.element.sensor_hostname AS sensor_hostname#927, 'results.element.sensor_id_cycle AS sensor_id_cycle#928, 'results.element.sensor_ip AS sensor_ip#929, 'results.element.sensor_unique_id AS sensor_unique_id#930, 'results.element.sensor_value_humidity AS sensor_value_humidity#931, 'results.element.sensor_value_temperature_motor1 AS sensor_value_temperature_motor1#932, 'results.element.sensor_value_temperature_motor2 AS sensor_value_temperature_motor2#933, 'results.element.sensor_value_temperature_motor3 AS sensor_value_temperature_motor3#934, 'results.element.sensor_value_vibrationhz_x AS sensor_value_vibrationhz_x#935, 'results.element.sensor_value_vibrationhz_y AS sensor_value_vibrationhz_y#936, 'results.element.sensor_value_vibrationhz_z AS sensor_value_vibrationhz_z#937, 'results.element.value_noise_dba_motor1 AS value_noise_dba_motor1#938, 'results.element.value_noise_dba_motor2 AS value_noise_dba_motor2#939, 'results.element.value_noise_dba_motor3 AS value_noise_dba_motor3#940]
+- Relation[sensor_unique_id#771,sensor_ip#772,sensor_hostname#773,sensor_id_cycle#774,sensor_event_type#775,sensor_value_temperature_motor1#776,sensor_value_temperature_motor2#777,sensor_value_temperature_motor3#778,sensor_value_vibrationHZ_X#779,sensor_value_vibrationHZ_Y#780,sensor_value_vibrationHZ_Z#781,sensor_value_humidity#782,value_noise_dbA_motor1#783,value_noise_dbA_motor2#784,value_noise_dbA_motor3#785,sensor_created_at#786] json


In [3]:
# median_income.toPandas()
# gdp.toPandas()
# min_wage.toPandas()
# unemployment.toPandas()

In [4]:
# median_income TRANSFORMATIONS
median_income = median_income.na.drop()
median_income = median_income.withColumnRenamed("20135", "2013")
median_income = median_income.drop("20136")

from pyspark.sql import functions as func
#Use `create_map` to create the map of columns with constant 
df = median_income.withColumn('mapCol', \
                    func.create_map(func.lit('2017'),median_income["2017"],
                                    func.lit('2016'),median_income["2016"],
                                    func.lit('2015'),median_income["2015"],
                                    func.lit('2014'),median_income["2014"],
                                    func.lit('2013'),median_income["2013"],
                                    func.lit('2012'),median_income["2012"],
                                    func.lit('2011'),median_income["2011"],
                                    func.lit('2010'),median_income["2010"]
                                   ) 
                  )
#Use explode function to explode the map 
res = df.select('*',func.explode(df.mapCol).alias('year','household_median_income'))
median_income = res.select('State','year', 'household_median_income')
median_income = median_income.withColumnRenamed("State","state")
median_income.toPandas()

Unnamed: 0,state,year,household_median_income
0,United States,2017,61372
1,United States,2016,60309
2,United States,2015,58476
3,United States,2014,55613
4,United States,2013,56479
...,...,...,...
411,Wyoming,2014,57721
412,Wyoming,2013,71084
413,Wyoming,2012,61517
414,Wyoming,2011,59539


In [5]:
# gdp TRANSFORMATIONS
from pyspark.sql import functions as func

columns_to_drop = ['GeoFips']
gdp = gdp.drop(*columns_to_drop)


#Use `create_map` to create the map of columns with constant 
gdp = gdp.withColumnRenamed("GeoName","State")
df = gdp.withColumn('mapCol', \
                    func.create_map(func.lit('2019'),gdp["2019"],
                                    func.lit('2018'),gdp["2018"],
                                    func.lit('2017'),gdp["2017"],
                                    func.lit('2016'),gdp["2016"],
                                    func.lit('2015'),gdp["2015"],
                                    func.lit('2014'),gdp["2014"],
                                    func.lit('2013'),gdp["2013"],
                                    func.lit('2012'),gdp["2012"],
                                    func.lit('2011'),gdp["2011"],
                                    func.lit('2010'),gdp["2010"]
                                                                    ) 
                  )
                            
#Use explode function to explode the map 
res = df.select('*',func.explode(df.mapCol).alias('year','gdp_state'))
res.toPandas()
gdp = res.select('State','year', 'gdp_state')
gdp = gdp.withColumnRenamed("State","state")
gdp = gdp.withColumn("gdp_state", round(col("gdp_state").cast("float"),2))

In [6]:
# minimum TRANSFORMATION
columns_to_drop = ['State.Minimum.Wage',"Federal.Minimum.Wage", "Effective.Minimum.Wage","Effective.Minimum.Wage.2020.Dollars", "Department.Of.Labor.Uncleaned.Data","Department.Of.Labor.Cleaned.Low.Value", "Department.Of.Labor.Cleaned.Low.Value.2020.Dollars", "Department.Of.Labor.Cleaned.High.Value", "Department.Of.Labor.Cleaned.High.Value.2020.Dollars","Footnote" ]
min_wage = min_wage.drop(*columns_to_drop)
min_wage = min_wage.filter((min_wage.Year == 2019) | (min_wage.Year == 2018) | (min_wage.Year == 2017) | (min_wage.Year == 2016) | (min_wage.Year == 2015) | (min_wage.Year == 2014) | (min_wage.Year == 2013) | (min_wage.Year == 2012) | (min_wage.Year == 2011) | (min_wage.Year == 2010))
min_wage = min_wage.withColumnRenamed("Year","year")
min_wage = min_wage.withColumnRenamed("State","state")
min_wage = min_wage.withColumnRenamed("State.Minimum.Wage.2020.Dollars","min_wage_state")
min_wage = min_wage.withColumnRenamed("Federal.Minimum.Wage.2020.Dollars","min_wage_federal")
min_wage = min_wage.withColumnRenamed("CPI.Average","cpi_average")
min_wage.toPandas()

Unnamed: 0,year,state,min_wage_state,min_wage_federal,cpi_average
0,2010,Alabama,0.00,8.60,218.056
1,2010,Alaska,9.19,8.60,218.056
2,2010,Arizona,8.60,8.60,218.056
3,2010,Arkansas,7.41,8.60,218.056
4,2010,California,9.49,8.60,218.056
...,...,...,...,...,...
535,2019,Virginia,7.34,7.34,255.657
536,2019,Washington,13.66,7.34,255.657
537,2019,West Virginia,8.85,7.34,255.657
538,2019,Wisconsin,7.34,7.34,255.657


In [7]:
# Groupby data and get yearly average by month, cast as integer and rename output column with alias
ep = unemployment.groupBy("Year","FIPS", "State") \
    .agg((avg("Employable Population").cast('integer')).alias('employable_pop'))

e = unemployment.groupBy("Year", "FIPS", "State") \
    .agg((avg("Employed").cast('integer')).alias('employed'))

u = unemployment.groupBy("Year", "FIPS", "State") \
    .agg((avg("Unemployed").cast('integer')).alias('unemployed')) 

#Join employable and employed data by Year, FIPS and State
join1 = ep.join(e, (ep.Year == e.Year) & (ep.FIPS == e.FIPS) & (ep.State == e.State), "inner").select(ep.Year, ep.FIPS, ep.State, ep.employable_pop, e.employed)

#Join join1(last join with employable and employed data) and unemployed
join2 = join1.join(u, (join1.Year == u.Year) & (join1.FIPS == u.FIPS) & (join1.State == u.State), "inner").select(join1.Year, join1.FIPS, join1.State, join1.employable_pop, join1.employed, u.unemployed)

#Lower case year and state columns, order data by FIPS and State and drop FIPS since it's not necessary
cols = ["year", "FIPS"]
unemployment = join2.withColumnRenamed("Year", "year") \
               .withColumnRenamed("State", "state") \
               .orderBy(*cols, ascending=True) \
               .drop("FIPS")
unemployment = unemployment.filter((unemployment.year == 2019) | (unemployment.year == 2018) | (unemployment.year == 2017) | (unemployment.year == 2016) | (unemployment.year == 2015) | (unemployment.year == 2014) | (unemployment.year == 2013) | (unemployment.year == 2012) | (unemployment.year == 2011) | (unemployment.year == 2010))
unemployment.toPandas()

Unnamed: 0,year,state,employable_pop,employed,unemployed
0,2010,Alabama,2195945,1964694,231250
1,2010,Alaska,361908,333426,28482
2,2010,Arizona,3089339,2769378,319960
3,2010,Arkansas,1353665,1242720,110945
4,2010,California,18332666,16092953,2239712
...,...,...,...,...,...
505,2019,Virginia,4410199,4287146,123053
506,2019,Washington,3912666,3745751,166915
507,2019,West Virginia,796610,757690,38920
508,2019,Wisconsin,3104883,3001178,103705


In [8]:
join1 = min_wage.join(gdp, (min_wage.year == gdp.year) & (min_wage.state == gdp.state), "inner").select(min_wage.year, min_wage.state, min_wage.min_wage_state, min_wage.min_wage_federal, min_wage.cpi_average, gdp.gdp_state)
join1.toPandas()

Unnamed: 0,year,state,min_wage_state,min_wage_federal,cpi_average,gdp_state
0,2010,Alabama,0.00,8.60,218.056,1.754701e+05
1,2010,Alaska,9.19,8.60,218.056,5.294770e+04
2,2010,Arizona,8.60,8.60,218.056,2.481253e+05
3,2010,Arkansas,7.41,8.60,218.056,1.009708e+05
4,2010,California,9.49,8.60,218.056,1.973512e+06
...,...,...,...,...,...,...
495,2019,Virginia,7.34,7.34,255.657,5.569052e+05
496,2019,Washington,13.66,7.34,255.657,6.129965e+05
497,2019,West Virginia,8.85,7.34,255.657,7.886390e+04
498,2019,Wisconsin,7.34,7.34,255.657,3.494165e+05


In [9]:
join2 = join1.join(unemployment, (join1.year == unemployment.year) & (join1.state == unemployment.state), "inner").select(join1.year, join1.state, join1.min_wage_state, join1.min_wage_federal, join1.cpi_average, join1.gdp_state, unemployment.employable_pop , unemployment.employed ,unemployment.unemployed)
join2.toPandas()

Unnamed: 0,year,state,min_wage_state,min_wage_federal,cpi_average,gdp_state,employable_pop,employed,unemployed
0,2014,New York,8.74,7.92,236.736,1.425724e+06,9530364,8926771,603592
1,2014,Ohio,7.92,7.92,236.736,5.928762e+05,5703982,5372798,331184
2,2019,Arkansas,9.36,7.34,255.657,1.309541e+05,1362321,1313944,48376
3,2019,Oregon,11.38,7.34,255.657,2.536232e+05,2103961,2024751,79209
4,2019,Virginia,7.34,7.34,255.657,5.569052e+05,4410199,4287146,123053
...,...,...,...,...,...,...,...,...,...
495,2014,Kentucky,7.92,7.92,236.736,1.864190e+05,2006420,1876914,129505
496,2017,Texas,7.65,7.65,245.120,1.665428e+06,13574954,12990073,584881
497,2018,Wisconsin,7.47,7.47,251.107,3.375531e+05,3118346,3024648,93698
498,2019,Arizona,12.14,7.34,255.657,3.701191e+05,3548826,3381846,166979


In [10]:
join3 = join2.join(median_income, (join2.year == median_income.year) & (join2.state == median_income.state), "left_outer").select(join2.year, join2.state, join2.min_wage_state, join2.min_wage_federal, join2.cpi_average, join2.gdp_state, join2.employable_pop , join2.employed ,join2.unemployed, median_income.household_median_income)
join3.toPandas()

Unnamed: 0,year,state,min_wage_state,min_wage_federal,cpi_average,gdp_state,employable_pop,employed,unemployed,household_median_income
0,2014,New York,8.74,7.92,236.736,1.425724e+06,9530364,8926771,603592,56290
1,2014,Ohio,7.92,7.92,236.736,5.928762e+05,5703982,5372798,331184,51454
2,2019,Arkansas,9.36,7.34,255.657,1.309541e+05,1362321,1313944,48376,
3,2019,Oregon,11.38,7.34,255.657,2.536232e+05,2103961,2024751,79209,
4,2019,Virginia,7.34,7.34,255.657,5.569052e+05,4410199,4287146,123053,
...,...,...,...,...,...,...,...,...,...,...
495,2014,Kentucky,7.92,7.92,236.736,1.864190e+05,2006420,1876914,129505,44346
496,2017,Texas,7.65,7.65,245.120,1.665428e+06,13574954,12990073,584881,59295
497,2018,Wisconsin,7.47,7.47,251.107,3.375531e+05,3118346,3024648,93698,
498,2019,Arizona,12.14,7.34,255.657,3.701191e+05,3548826,3381846,166979,


In [11]:
join3 = join3.na.fill(0)
join3.toPandas()

Unnamed: 0,year,state,min_wage_state,min_wage_federal,cpi_average,gdp_state,employable_pop,employed,unemployed,household_median_income
0,2014,New York,8.74,7.92,236.736,1.425724e+06,9530364,8926771,603592,56290
1,2014,Ohio,7.92,7.92,236.736,5.928762e+05,5703982,5372798,331184,51454
2,2019,Arkansas,9.36,7.34,255.657,1.309541e+05,1362321,1313944,48376,
3,2019,Oregon,11.38,7.34,255.657,2.536232e+05,2103961,2024751,79209,
4,2019,Virginia,7.34,7.34,255.657,5.569052e+05,4410199,4287146,123053,
...,...,...,...,...,...,...,...,...,...,...
495,2014,Kentucky,7.92,7.92,236.736,1.864190e+05,2006420,1876914,129505,44346
496,2017,Texas,7.65,7.65,245.120,1.665428e+06,13574954,12990073,584881,59295
497,2018,Wisconsin,7.47,7.47,251.107,3.375531e+05,3118346,3024648,93698,
498,2019,Arizona,12.14,7.34,255.657,3.701191e+05,3548826,3381846,166979,


In [17]:
# Reorder data
data =  join3.select("year",
        "state",
        "household_median_income",
        "min_wage_state",
        "min_wage_federal",
        "cpi_average",
        "unemployed",
        "employed",
        "employable_pop",
        "gdp_state")
data.toPandas()

Unnamed: 0,year,state,household_median_income,min_wage_state,min_wage_federal,cpi_average,unemployed,employed,employable_pop,gdp_state
0,2014,New York,56290,8.74,7.92,236.736,603592,8926771,9530364,1.425724e+06
1,2014,Ohio,51454,7.92,7.92,236.736,331184,5372798,5703982,5.928762e+05
2,2019,Arkansas,,9.36,7.34,255.657,48376,1313944,1362321,1.309541e+05
3,2019,Oregon,,11.38,7.34,255.657,79209,2024751,2103961,2.536232e+05
4,2019,Virginia,,7.34,7.34,255.657,123053,4287146,4410199,5.569052e+05
...,...,...,...,...,...,...,...,...,...,...
495,2014,Kentucky,44346,7.92,7.92,236.736,129505,1876914,2006420,1.864190e+05
496,2017,Texas,59295,7.65,7.65,245.120,584881,12990073,13574954,1.665428e+06
497,2018,Wisconsin,,7.47,7.47,251.107,93698,3024648,3118346,3.375531e+05
498,2019,Arizona,,12.14,7.34,255.657,166979,3381846,3548826,3.701191e+05


In [18]:
from pyspark.sql.types import IntegerType
data = data.fillna({'household_median_income': 0})
data = data.withColumn('household_median_income', regexp_replace(col('household_median_income'), "\\,", "").cast("int"))
data = data.withColumn('min_wage_state', col('min_wage_state').cast("float"))
data = data.withColumn('min_wage_federal', col('min_wage_federal').cast("float"))
data = data.withColumn("cpi_average", round(col("cpi_average").cast("float"),2))
data = data.withColumn('household_median_income', col('household_median_income').cast("float"))

data.toPandas()

Unnamed: 0,year,state,household_median_income,min_wage_state,min_wage_federal,cpi_average,unemployed,employed,employable_pop,gdp_state
0,2014,New York,56290.0,8.74,7.92,236.740005,603592,8926771,9530364,1.425724e+06
1,2014,Ohio,51454.0,7.92,7.92,236.740005,331184,5372798,5703982,5.928762e+05
2,2019,Arkansas,0.0,9.36,7.34,255.660004,48376,1313944,1362321,1.309541e+05
3,2019,Oregon,0.0,11.38,7.34,255.660004,79209,2024751,2103961,2.536232e+05
4,2019,Virginia,0.0,7.34,7.34,255.660004,123053,4287146,4410199,5.569052e+05
...,...,...,...,...,...,...,...,...,...,...
495,2014,Kentucky,44346.0,7.92,7.92,236.740005,129505,1876914,2006420,1.864190e+05
496,2017,Texas,59295.0,7.65,7.65,245.119995,584881,12990073,13574954,1.665428e+06
497,2018,Wisconsin,0.0,7.47,7.47,251.110001,93698,3024648,3118346,3.375531e+05
498,2019,Arizona,0.0,12.14,7.34,255.660004,166979,3381846,3548826,3.701191e+05


In [25]:
data.printSchema()

root
 |-- year: integer (nullable = true)
 |-- state: string (nullable = true)
 |-- household_median_income: float (nullable = true)
 |-- min_wage_state: float (nullable = false)
 |-- min_wage_federal: float (nullable = false)
 |-- cpi_average: float (nullable = true)
 |-- unemployed: integer (nullable = true)
 |-- employed: integer (nullable = true)
 |-- employable_pop: integer (nullable = true)
 |-- gdp_state: float (nullable = false)



In [19]:
data.repartition(1) \
    .write \
    .partitionBy("year") \
    .format("parquet") \
    .mode("overwrite") \
    .save("hdfs://hdfs-nn:9000/TABDG8/warehouse/economy.db/parquet_table")

In [26]:
data.repartition(1) \
    .write \
    .partitionBy("year") \
    .format("parquet") \
    .mode("overwrite") \
    .save("hdfs://hdfs-nn:9000/TABDG8/warehouse/economy.db/parquet_economy")

In [20]:
spark.catalog.recoverPartitions("economy.parquet_table")

spark.sql(
    """
    SELECT *
    FROM economy.parquet_table
    """
).toPandas()

Unnamed: 0,state,household_median_income,min_wage_state,min_wage_federal,cpi_average,unemployed,employed,employable_pop,gdp_state,year
0,Wyoming,57837.0,5.43,7.65,245.119995,12213,280890,293103,37271.199219,2017
1,Massachusetts,73227.0,11.61,7.65,245.119995,138111,3551513,3689625,539973.375000,2017
2,South Carolina,54971.0,0.00,7.65,245.119995,98831,2213204,2312036,223414.000000,2017
3,Nebraska,59619.0,9.50,7.65,245.119995,29562,982397,1011960,120949.703125,2017
4,Idaho,60208.0,7.65,7.65,245.119995,26761,807145,833907,73286.500000,2017
...,...,...,...,...,...,...,...,...,...,...
495,Alabama,0.0,0.00,7.47,251.110001,85942,2130741,2216683,221030.703125,2018
496,Washington,0.0,11.85,7.47,251.110001,169511,3637302,3806814,575416.687500,2018
497,Arkansas,0.0,8.76,7.47,251.110001,49327,1304130,1353458,127761.296875,2018
498,South Dakota,0.0,9.12,7.47,251.110001,14112,445454,459566,53239.000000,2018


In [21]:
df1 = spark.sql(
    """
    SELECT *
    FROM economy.parquet_table
    """
).toPandas()

In [None]:
spark.close()