In [1]:
import pyspark
import re
from pyspark.sql import SparkSession
from pyspark.sql.functions import input_file_name, regexp_extract
from pyspark.conf import SparkConf
from pyspark.context import SparkContext


In [2]:
credentials_location = '../../dtc-gc-37b8d03b4e65.json'

conf = SparkConf() \
    .setMaster('local[*]') \
    .setAppName('eia') \
    .set("spark.jars", "./lib/gcs-connector-hadoop3-2.2.5.jar") \
    .set("spark.hadoop.google.cloud.auth.service.account.enable", "true") \
    .set("spark.hadoop.google.cloud.auth.service.account.json.keyfile", credentials_location)

sc = SparkContext(conf=conf)

hadoop_conf = sc._jsc.hadoopConfiguration()

hadoop_conf.set("fs.AbstractFileSystem.gs.impl",  "com.google.cloud.hadoop.fs.gcs.GoogleHadoopFS")
hadoop_conf.set("fs.gs.impl", "com.google.cloud.hadoop.fs.gcs.GoogleHadoopFileSystem")
hadoop_conf.set("fs.gs.auth.service.account.json.keyfile", credentials_location)
hadoop_conf.set("fs.gs.auth.service.account.enable", "true")

In [3]:
spark = SparkSession.builder \
    .config(conf=sc.getConf()) \
    .getOrCreate()

In [4]:
import pandas as pd

In [5]:
from pyspark.sql import types

In [6]:
production_schema = types.StructType([
    types.StructField("state", types.StringType(), True),
    types.StructField("year", types.IntegerType(), True),
    types.StructField("annual_average", types.FloatType(), True),
    types.StructField("annual_total", types.FloatType(), True),
    types.StructField("state_category", types.StringType(), True)
])

In [10]:
period = 'week'
print(f'processing data for {period}s...')

input_path = f'gs://dtc_data_lake_dtc-gc/data/eia/{period}/*/'
output_path = f'gs://dtc_data_lake_dtc-gc/pq/eia/{period}/'

df_production = spark.read \
 .option("header", "true") \
 .schema(production_schema) \
 .csv(input_path)

df_production = df_production.withColumn("year", input_file_name())
df_production = df_production.withColumn("year", regexp_extract("year", "(\d{4})", 1))

df_production \
  .repartition(4) \
  .write.parquet(output_path, mode='overwrite')


processing data for weeks...


In [42]:
#df_production.dtypes

[('state', 'string'),
 ('year', 'string'),
 ('annual_average', 'float'),
 ('annual_total', 'float'),
 ('state_category', 'string')]

In [45]:
#df_production.show()

+--------------+----+--------------+------------+--------------+
|         state|year|annual_average|annual_total|state_category|
+--------------+----+--------------+------------+--------------+
|       Alabama|2004|      333555.0|    352355.0|      348653.0|
|        Alaska|2004|       16078.0|     16984.0|       16805.0|
|       Arizona|2004|      219963.0|    232360.0|      229919.0|
|      Arkansas|2004|         201.0|       212.0|         210.0|
|      Colorado|2004|      747213.0|    789327.0|      781034.0|
|      Illinois|2004|      504316.0|    532739.0|      527142.0|
|       Indiana|2004|      663268.0|    700651.0|      693289.0|
|        Kansas|2004|        2997.0|      3166.0|        3133.0|
|Kentucky Total|2004|     2057293.0|   2173243.0|     2150410.0|
|       Eastern|2004|     1689501.0|   1784722.0|     1765971.0|
|       Western|2004|      367792.0|    388521.0|      384439.0|
|     Louisiana|2004|       80270.0|    103476.0|      104132.0|
|      Maryland|2004|    