In [1]:
from pyspark.sql import SparkSession
import os

In [10]:
local=False
if local:
    spark=SparkSession.builder.master("local[4]") \
                  .appName("spark_etl_demo").getOrCreate()
else:
    spark=SparkSession.builder \
                      .master("k8s://https://kubernetes.default.svc:443") \
                      .appName("spark_etl_demo") \
                      .config("spark.kubernetes.container.image",os.environ["IMAGE_NAME"]) \
                      .config("spark.kubernetes.authenticate.driver.serviceAccountName",os.environ['KUBERNETES_SERVICE_ACCOUNT']) \
                      .config("spark.kubernetes.namespace", os.environ['KUBERNETES_NAMESPACE']) \
                      .config("spark.executor.instances", "4") \
                      .config("spark.executor.memory","4g") \
                      .config("spark.driver.memory","8g") \
                      .enableHiveSupport() \
                      .getOrCreate()
    
def set_log_level(spark_session,log_level:str):
    logger = spark_session.sparkContext._jvm.org.apache.log4j
    if log_level=="INFO":
        logger_level = logger.Level.INFO
    elif log_level=="WARN":
        logger_level = logger.Level.WARN
    elif log_level=="ERROR":
        logger_level = logger.Level.ERROR
    else:
        raise ValueError("The log_level must be INFO, WARN or ERROR")
    logger.LogManager.getLogger("org").setLevel(logger_level)
    logger.LogManager.getLogger("akka").setLevel(logger_level)
    
set_log_level(spark,"ERROR")

In [3]:
work_dir="s3a://pengfei"
parquet_file_name="diffusion/data_format/sf_fire/parquet/raw"
data_path=f"{work_dir}/{parquet_file_name}"

# Step 1: Prepare source dataframe

Use spark context to read a parquet file and return a data frame 

In [12]:
df_raw=spark.read.parquet(data_path)

In [15]:
row_nb=df_raw.count()
col_nb=len(df_raw.columns)

print(f"data frame has : {row_nb} rows and {col_nb} columns")



data frame has : 5500519 rows and 34 columns


                                                                                

In [16]:
df=df_raw.select("IncidentNumber", "CallType", "CallDate","City","NeighborhoodDistrict")
df.show()

[Stage 13:>                                                         (0 + 1) / 1]

+--------------+--------------------+----------+-------------+--------------------+
|IncidentNumber|            CallType|  CallDate|         City|NeighborhoodDistrict|
+--------------+--------------------+----------+-------------+--------------------+
|      21017645|              Alarms|02/08/2021|San Francisco|           Lakeshore|
|      21017596|              Alarms|02/08/2021|San Francisco|         Mission Bay|
|      21017578|Citizen Assist / ...|02/08/2021|San Francisco|              Marina|
|      21017552|               Other|02/08/2021|    Daly City|                None|
|      21017398|              Alarms|02/07/2021|San Francisco|   Lone Mountain/USF|
|      21017307|              Alarms|02/07/2021|San Francisco|           Japantown|
|      21017263|        Outside Fire|02/07/2021|San Francisco|Bayview Hunters P...|
|      21017206|              Alarms|02/07/2021|San Francisco| Castro/Upper Market|
|      21017173|    Medical Incident|02/07/2021|San Francisco|             M

                                                                                

# Step2: Create a table in hive metastore

Use the spark dataframe to create a hive table in the hive metastore. So we can reuse it for later. 


In [18]:
table_name="sf_fire"

In [19]:
schema_str = ', '.join([' '.join(x) for x in df.dtypes])

spark.sql(f"""
CREATE EXTERNAL TABLE IF NOT EXISTS {table_name}
({schema_str})
STORED as parquet LOCATION '{data_path}'
""")


2022-09-15 15:38:36,952 INFO hive.log: Updating table stats fast for sf_fire
2022-09-15 15:38:36,953 INFO hive.log: Updated size of table sf_fire to 372045123


DataFrame[]

In [20]:
spark.sql('show tables;').show()

+---------+---------+-----------+
|namespace|tableName|isTemporary|
+---------+---------+-----------+
|  default|  sf_fire|      false|
+---------+---------+-----------+



Now your hive table has been created. In the backgroud, if you enabled the listener, the metadata of this hive table will be uploaded to our [data catalog](https://atlas.lab.sspcloud.fr/index.html#!/search). So you can find all your hive table easily even you don't have notebook anymore.

You can try to use the search engine of our [data catalog](https://atlas.lab.sspcloud.fr/index.html#!/search) to find your table. 

In [22]:
spark.sql(f"""SELECT * FROM {table_name} limit 5""").show()



+--------------+--------------------+----------+-------------+--------------------+
|IncidentNumber|            CallType|  CallDate|         City|NeighborhoodDistrict|
+--------------+--------------------+----------+-------------+--------------------+
|      21017645|              Alarms|02/08/2021|San Francisco|           Lakeshore|
|      21017596|              Alarms|02/08/2021|San Francisco|         Mission Bay|
|      21017578|Citizen Assist / ...|02/08/2021|San Francisco|              Marina|
|      21017552|               Other|02/08/2021|    Daly City|                None|
|      21017398|              Alarms|02/07/2021|San Francisco|   Lone Mountain/USF|
+--------------+--------------------+----------+-------------+--------------------+



                                                                                

In [25]:
spark.sql(f"""select CallType, count(IncidentNumber) as incidentNum from {table_name} group by CallType order by incidentNum desc limit 10""").show()



+--------------------+-----------+
|            CallType|incidentNum|
+--------------------+-----------+
|    Medical Incident|    3596332|
|      Structure Fire|     681179|
|              Alarms|     599263|
|   Traffic Collision|     224909|
|               Other|      87468|
|Citizen Assist / ...|      82173|
|        Outside Fire|      68491|
|        Water Rescue|      28253|
|        Vehicle Fire|      25512|
|Gas Leak (Natural...|      22961|
+--------------------+-----------+



                                                                                

In [26]:
spark.sql(f"""select NeighborhoodDistrict, count(IncidentNumber) as incidentNum from {table_name} group by NeighborhoodDistrict order by incidentNum desc limit 10""").show()



+--------------------+-----------+
|NeighborhoodDistrict|incidentNum|
+--------------------+-----------+
|          Tenderloin|     733360|
|     South of Market|     531853|
|             Mission|     498262|
|Financial Distric...|     371420|
|Bayview Hunters P...|     298034|
|     Sunset/Parkside|     213810|
|    Western Addition|     201971|
|            Nob Hill|     181346|
|      Outer Richmond|     146711|
|        Hayes Valley|     135540|
+--------------------+-----------+



                                                                                

# Step 3. Drop table 

You can delete your table if you don't need it anymore. You will notice the metadata of the deleted table are `removed` from the data catalog too.  

In [27]:
spark.sql(f"""drop table if exists {table_name}""").show()
spark.sql('show tables;').show()

++
||
++
++

+---------+---------+-----------+
|namespace|tableName|isTemporary|
+---------+---------+-----------+
+---------+---------+-----------+

