In [1]:
pip install delta-spark

Note: you may need to restart the kernel to use updated packages.


In [2]:
from os import PathLike
from hdfs import InsecureClient
from pyspark.sql import SparkSession
from pyspark.sql import Row
from delta import *
from pyspark.sql.types import *
from pyspark.sql.functions import *

# warehouse_location points to the default location for managed databases and tables
warehouse_location = 'hdfs://hdfs-nn:9000/Qualidade_NYC/silver'

builder = SparkSession \
    .builder \
    .master("local[2]") \
    .appName("Python Spark DataFrames and SQL") \
    .config("spark.sql.warehouse.dir", warehouse_location) \
    .config("hive.metastore.uris", "thrift://hive-metastore:9083") \
    .config("spark.sql.extensions", "io.delta.sql.DeltaSparkSessionExtension") \
    .config("spark.sql.catalog.spark_catalog", "org.apache.spark.sql.delta.catalog.DeltaCatalog") \
    .config("spark.jars.packages", "io.delta:delta-core_2.12:1.0.0") \
    .enableHiveSupport() \

spark = configure_spark_with_delta_pip(builder).getOrCreate()

In [3]:
hdfs_path = "hdfs://hdfs-nn:9000/Qualidade_NYC/bronze/Watershed_Water_Quality_-_Wastewater.csv"
#define the schema for the dataframe
customSchema = StructType([
    StructField("Sample Id", StringType(), True),        
    StructField("Site", StringType(), True),
    StructField("Sample Start Date", StringType(), True),
    StructField("Date", StringType(), True),
    StructField("Start Time", StringType(), True),
    StructField("Time", StringType(), True),
    StructField("Analyte", StringType(), True),
    StructField("Status", StringType(), True),
    StructField("Final Result", StringType(), True),
    StructField("Units", StringType(), True),
    StructField("Water Treatment Plant", StringType(), True),
    StructField("WTP Group", StringType(), True),
    StructField("SPDES Number", StringType(), True)
])

waterDF= spark \
            .read\
            .option("delimiter",",")\
            .option("header","true")\
            .schema(customSchema) \
            .csv(hdfs_path)
waterDF.show()
waterDF.printSchema()

+------------+---------------+-----------------+----------+------------+------------+--------------------+------+------------+-----+---------------------+--------------------+------------+
|   Sample Id|           Site|Sample Start Date|      Date|  Start Time|        Time|             Analyte|Status|Final Result|Units|Water Treatment Plant|           WTP Group|SPDES Number|
+------------+---------------+-----------------+----------+------------+------------+--------------------+------+------------+-----+---------------------+--------------------+------------+
|K00018239002|            STE|       01/01/2013|01/01/2013|09:35:00.000|15:35:00.000|Phosphorus, Total...|  Done|          10| µg/L|         Tannersville|City Owned - Effl...|   NY0026573|
|E-1805379-01|CLEAR POOL CAMP|             null|01/02/2019|        null|        null|Phosphorus, Total...|Cancel|      Cancel| µg/L|                 null|      Non-City Owned|   NY0098621|
|K00018239005|            SGE|       01/01/2013|01/01/2

In [3]:
spark.sql(
    """
    create database Projeto location 'hdfs://hdfs-nn:9000/Qualidade_NYC/silver/Projeto.db'
    """
)

AnalysisException: Namespace 'Projeto' already exists

In [4]:
from pyspark.sql.functions import when, col, concat, lit

replaced_water1 = waterDF.withColumn("Status",
       when(col("Final Result")=="Error" , "Failure")
       .otherwise(col("Status")))

replaced_water2 = replaced_water1.withColumn("Status",
       when(col("Status")=="" ,None)
       .otherwise(col("Status")))

replaced_water3 = replaced_water2.withColumn("SPDES Number",
       when(col("SPDES Number")=="" ,None)
       .otherwise(col("SPDES Number")))

replaced_water4 = replaced_water3.withColumn("Water Treatment Plant",
       when(col("Water Treatment Plant")=="" ,None)
       .otherwise(col("Water Treatment Plant")))

replaced_water5 = replaced_water4.withColumn("Time",
       when(col("Time")=="" ,None)
       .otherwise(col("Time")))


#replaced_water6 = replaced_water5.select(col("Date"),to_date
#       (col("Date"),"MM/dd/yyyy").alias("date"))

replaced_water6 = replaced_water5.select('Date', substring('Date', 7,4).alias('year'))

replaced_water6 = replaced_water5.withColumn('Year', split(replaced_water5['Date'],'/').getItem(2))

replaced_water7 = replaced_water6.drop("Start Time")

replaced_water8 = replaced_water7.drop("Sample Start Date")

replaced_water = replaced_water8.withColumnRenamed("Sample Id","Sample_Id") \
                                .withColumnRenamed("Final Result","Final_Result")\
                                .withColumnRenamed("Water Treatment Plant","Water_Treatment_Plant")\
                                .withColumnRenamed("WTP Group","WTP_Group")\
                                .withColumnRenamed("SPDES Number","SPDES_Number")

replaced_water.toPandas()

Unnamed: 0,Sample_Id,Site,Date,Time,Analyte,Status,Final_Result,Units,Water_Treatment_Plant,WTP_Group,SPDES_Number,Year
0,K00018239002,STE,01/01/2013,15:35:00.000,"Phosphorus, Total (as P)",Done,10,µg/L,Tannersville,City Owned - Effluent,NY0026573,2013
1,E-1805379-01,CLEAR POOL CAMP,01/02/2019,,"Phosphorus, Total (as P)",Cancel,Cancel,µg/L,,Non-City Owned,NY0098621,2019
2,K00018239005,SGE,01/01/2013,15:00:00.000,"Phosphorus, Total (as P)",Done,23,µg/L,Grand Gorge,City Owned - Effluent,NY0026565,2013
3,K00018239008,EPE,01/01/2013,15:00:00.000,"Phosphorus, Total (as P)",Done,7,µg/L,Pine Hill,City Owned - Effluent,NY0026557,2013
4,K00018239007,EPR,01/01/2013,15:00:00.000,BOD,Done,52.2,mg/L,Pine Hill,City Owned - Influent,NY0026557,2013
...,...,...,...,...,...,...,...,...,...,...,...,...
619597,K-2104941-02,Ashland WTP,12/29/2021,09:17:00.000,Dissolved Oxygen,Done,10.4,mg/L,Schoharie,Non-City Owned,NY0263214,2021
619598,K-2104941-02,Ashland WTP,12/29/2021,09:17:00.000,pH,Done,6.16,,Schoharie,Non-City Owned,NY0263214,2021
619599,K-2104941-02,Ashland WTP,12/29/2021,09:17:00.000,Temperature,Done,8.9,C,Schoharie,Non-City Owned,NY0263214,2021
619600,K-2104941-02,Ashland WTP,12/29/2021,09:17:00.000,"Coliform, Fecal",Done,<1,FC/100mL,Schoharie,Non-City Owned,NY0263214,2021


In [9]:
spark.sql(
    """
    DROP TABLE IF EXISTS Projeto.replaced_water
    """
)

DataFrame[]

In [5]:
spark.sql(
    """
    CREATE EXTERNAL TABLE Trabalho.Qualidade_Agua (
        Sample_Id VARCHAR(50),
        Site VARCHAR(50),
        Date VARCHAR(50),
        Time VARCHAR(50),
        Analyte VARCHAR(50),
        Status VARCHAR(50),
        Final_Result VARCHAR(50),
        Units VARCHAR(50),
        Water_Treatment_Plant VARCHAR(50),
        WTP_Group VARCHAR(50),
        SPDES_Number VARCHAR(50)
       
    )
    USING DELTA
    
    PARTITIONED BY (
     Year INT
    )
    LOCATION 'hdfs://hdfs-nn:9000/Qualidade_NYC/silver/Projeto.db/Qualidade_Agua'
    """
)

DataFrame[]

In [6]:
#write df to hive deltalake_table
replaced_water\
    .select("Sample_Id","Site","Date","Time","Analyte","Status",
            "Final_Result","Units","Water_Treatment_Plant","WTP_Group","SPDES_Number","Year") \
    .write \
    .mode("overwrite") \
    .partitionBy("Year") \
    .format("delta") \
    .save("hdfs://hdfs-nn:9000/Qualidade_NYC/silver/Projeto.db/Qualidade_Agua/deltalake_table/")
from pyspark.sql.types import *