In [3]:
import sys
!{sys.executable} -m pip install hdfs


from os import PathLike
from hdfs import InsecureClient
client = InsecureClient("http://hdfs-nn:9870/", user="anonymous")
from_path ="./Drinking_Water_Quality_Distribution_Monitoring_Data.csv"
to_path ="/Projeto/Bronze/Drinking_Water_Quality_Distribution_Monitoring_Data.csv"
client.delete(to_path)
client.upload(to_path, from_path)



'/Projeto/Bronze/Drinking_Water_Quality_Distribution_Monitoring_Data.csv'

In [1]:
pip install delta-spark

Note: you may need to restart the kernel to use updated packages.


In [2]:
from os import PathLike
from hdfs import InsecureClient
from pyspark.sql import SparkSession
from pyspark.sql import Row
from delta import *
from pyspark.sql.types import LongType, StringType, StructField, StructType, BooleanType, ArrayType, IntegerType, FloatType
from pyspark.sql.functions import *

# warehouse_location points to the default location for managed databases and tables
warehouse_location = 'hdfs://hdfs-nn:9000/Projeto/Silver'

builder = SparkSession \
    .builder \
    .master("local[2]") \
    .appName("Python Spark DataFrames and SQL") \
    .config("spark.sql.warehouse.dir", warehouse_location) \
    .config("hive.metastore.uris", "thrift://hive-metastore:9083") \
    .config("spark.sql.extensions", "io.delta.sql.DeltaSparkSessionExtension") \
    .config("spark.sql.catalog.spark_catalog", "org.apache.spark.sql.delta.catalog.DeltaCatalog") \
    .config("spark.jars.packages", "io.delta:delta-core_2.12:1.0.0") \
    .enableHiveSupport() \

spark = configure_spark_with_delta_pip(builder).getOrCreate()

In [4]:
spark.sql(
    """
    create database Projeto location 'hdfs://hdfs-nn:9000/demo/silver/Projeto.db'
    """
)

DataFrame[]

In [2]:
spark.sql(
    """
    SHOW TABLES FROM Projeto
    """
).show()

+---------+--------------------+-----------+
|namespace|           tableName|isTemporary|
+---------+--------------------+-----------+
|  projeto|drinking_water_qu...|      false|
+---------+--------------------+-----------+



In [4]:
hdfs_path = "hdfs://hdfs-nn:9000/Projeto/Bronze/Drinking_Water_Quality_Distribution_Monitoring_Data.csv"

customSchema = StructType([
    StructField("Sample_Number", IntegerType(), True),        
    StructField("Sample_Date", StringType(), True),
    StructField("Sample_Time", StringType(), True),
    StructField("Sample_Site", StringType(), True),
    StructField("Sample_class", StringType(), True),
    StructField("Residual_Free_Chlorine__mg_L", FloatType(), True),
    StructField("Turbidity__NTU", FloatType(), True), 
    StructField("Fluoride__mg_L", StringType(), True),
    StructField("Coliform__Quanti_Tray__MPN_100ml", StringType(), True),
    StructField("Ecoli_Quanti_Tray__MPN_100mL", StringType(), True),

])

drinkingWQuality = spark \
            .read\
            .option("delimiter",",")\
            .option("header","true")\
            .schema(customSchema) \
            .csv(hdfs_path)
drinkingWQuality.show()
drinkingWQuality.printSchema()

+-------------+-----------+-----------+-----------+------------+----------------------------+--------------+--------------+--------------------------------+----------------------------+
|Sample_Number|Sample_Date|Sample_Time|Sample_Site|Sample_class|Residual_Free_Chlorine__mg_L|Turbidity__NTU|Fluoride__mg_L|Coliform__Quanti_Tray__MPN_100ml|Ecoli_Quanti_Tray__MPN_100mL|
+-------------+-----------+-----------+-----------+------------+----------------------------+--------------+--------------+--------------------------------+----------------------------+
|    202120243| 07/01/2021|      10:31|      23650|  Compliance|                        0.22|          0.84|          null|                              <1|                          <1|
|    202120244| 07/01/2021|      09:54|      29550|  Compliance|                        0.69|          0.81|          null|                              <1|                          <1|
|    202120245| 07/01/2021|      07:52|      50200| Operational|      

In [7]:
replaced_drinkingWQuality = drinkingWQuality.drop("Fluoride__mg_L")
replaced_drinkingWQuality.toPandas()

Unnamed: 0,Sample_Number,Sample_Date,Sample_Time,Sample_Site,Sample_class,Residual_Free_Chlorine__mg_L,Turbidity__NTU,Coliform__Quanti_Tray__MPN_100ml,Ecoli_Quanti_Tray__MPN_100mL
0,202120243,07/01/2021,10:31,23650,Compliance,0.22,0.84,<1,<1
1,202120244,07/01/2021,09:54,29550,Compliance,0.69,0.81,<1,<1
2,202120245,07/01/2021,07:52,50200,Operational,0.55,0.77,<1,<1
3,202120246,07/01/2021,08:12,50250,Compliance,0.87,0.81,<1,<1
4,202120247,07/01/2021,08:31,50300,Operational,0.80,0.84,<1,<1
...,...,...,...,...,...,...,...,...,...
116119,202120165,06/30/2021,08:25,22950,Compliance,0.39,0.83,<1,<1
116120,202120166,06/30/2021,09:07,27000,Operational,0.70,0.88,<1,<1
116121,202120167,06/30/2021,09:28,27750,Compliance,0.62,0.87,<1,<1
116122,202120168,06/30/2021,11:19,40200,Operational,1.06,0.85,<1,<1


In [8]:
replaced_drinkingWQuality2 = replaced_drinkingWQuality.withColumn('Year', split(replaced_drinkingWQuality['Sample_Date'],'/').getItem(2))
replaced_drinkingWQuality2.toPandas()

Unnamed: 0,Sample_Number,Sample_Date,Sample_Time,Sample_Site,Sample_class,Residual_Free_Chlorine__mg_L,Turbidity__NTU,Coliform__Quanti_Tray__MPN_100ml,Ecoli_Quanti_Tray__MPN_100mL,Year
0,202120243,07/01/2021,10:31,23650,Compliance,0.22,0.84,<1,<1,2021
1,202120244,07/01/2021,09:54,29550,Compliance,0.69,0.81,<1,<1,2021
2,202120245,07/01/2021,07:52,50200,Operational,0.55,0.77,<1,<1,2021
3,202120246,07/01/2021,08:12,50250,Compliance,0.87,0.81,<1,<1,2021
4,202120247,07/01/2021,08:31,50300,Operational,0.80,0.84,<1,<1,2021
...,...,...,...,...,...,...,...,...,...,...
116119,202120165,06/30/2021,08:25,22950,Compliance,0.39,0.83,<1,<1,2021
116120,202120166,06/30/2021,09:07,27000,Operational,0.70,0.88,<1,<1,2021
116121,202120167,06/30/2021,09:28,27750,Compliance,0.62,0.87,<1,<1,2021
116122,202120168,06/30/2021,11:19,40200,Operational,1.06,0.85,<1,<1,2021


In [9]:
replaced_drinkingWQuality3 = replaced_drinkingWQuality2.withColumn(
    "Residual_Free_Chlorine__mg_L",
    when(
        (col("Residual_Free_Chlorine__mg_L").isNull()), 
        "null"
    ).otherwise(col("Residual_Free_Chlorine__mg_L")))

In [10]:
replaced_drinkingWQuality4 = replaced_drinkingWQuality3.withColumn(
    "Turbidity__NTU",
    when(
        (col("Turbidity__NTU").isNull()), 
        "null"
    ).otherwise(col("Turbidity__NTU")))

In [11]:
replaced_drinkingWQuality5 = replaced_drinkingWQuality4.withColumn(
    "Coliform__Quanti_Tray__MPN_100ml",
    when(
        (col("Coliform__Quanti_Tray__MPN_100ml").isNull()), 
        "Sem informação"
    ).otherwise(col("Coliform__Quanti_Tray__MPN_100ml")))

In [12]:
replaced_drinkingWQuality6 = replaced_drinkingWQuality5.withColumn(
    "Ecoli_Quanti_Tray__MPN_100mL",
    when(
        (col("Ecoli_Quanti_Tray__MPN_100mL").isNull()), 
        "Sem informação"
    ).otherwise(col("Ecoli_Quanti_Tray__MPN_100mL")))

In [7]:
spark.sql(
    """
    CREATE EXTERNAL TABLE Projeto.Drinking_Water_Quality_Distribution_Monitoring_Data (
        Sample_Number INT, 
        Sample_Date VARCHAR(50),
        Sample_Time VARCHAR(50),
        Sample_Site VARCHAR(50),
        Sample_class VARCHAR(50),
        Residual_Free_Chlorine__mg_L INT,
        Turbidity__NTU INT,
        Coliform__Quanti_Tray__MPN_100ml VARCHAR(50),
        Ecoli_Quanti_Tray__MPN_100mL VARCHAR(50)
   
    )
     USING DELTA
    PARTITIONED BY (
        Year INT
    )
    LOCATION 'hdfs://hdfs-nn:9000/Projeto/Silver/Projeto.db/Drinking_Water_Quality_Distribution_Monitoring_Data'
    """
)

DataFrame[]

In [5]:
spark.sql(
    """
    DROP TABLE IF EXISTS Projeto.Drinking_Water_Quality_Distribution_Monitoring_Data
    """
)

DataFrame[]

In [14]:
replaced_drinkingWQuality6 \
    .select("Sample_Number", "Sample_Date", "Sample_Time", "Sample_Site", "Sample_class", "Residual_Free_Chlorine__mg_L", "Turbidity__NTU", "Coliform__Quanti_Tray__MPN_100ml", "Ecoli_Quanti_Tray__MPN_100mL", "Year") \
    .write \
    .mode("overwrite") \
    .partitionBy("Year") \
    .format("delta") \
    .save("hdfs://hdfs-nn:9000/Projeto/Silver/Projeto.db/Drinking_Water_Quality_Distribution_Monitoring_Data/deltalake_table/")

In [15]:
replaced_drinkingWQuality6.toPandas()

Unnamed: 0,Sample_Number,Sample_Date,Sample_Time,Sample_Site,Sample_class,Residual_Free_Chlorine__mg_L,Turbidity__NTU,Coliform__Quanti_Tray__MPN_100ml,Ecoli_Quanti_Tray__MPN_100mL,Year
0,202120243,07/01/2021,10:31,23650,Compliance,0.22,0.84,<1,<1,2021
1,202120244,07/01/2021,09:54,29550,Compliance,0.69,0.81,<1,<1,2021
2,202120245,07/01/2021,07:52,50200,Operational,0.55,0.77,<1,<1,2021
3,202120246,07/01/2021,08:12,50250,Compliance,0.87,0.81,<1,<1,2021
4,202120247,07/01/2021,08:31,50300,Operational,0.8,0.84,<1,<1,2021
...,...,...,...,...,...,...,...,...,...,...
116119,202120165,06/30/2021,08:25,22950,Compliance,0.39,0.83,<1,<1,2021
116120,202120166,06/30/2021,09:07,27000,Operational,0.7,0.88,<1,<1,2021
116121,202120167,06/30/2021,09:28,27750,Compliance,0.62,0.87,<1,<1,2021
116122,202120168,06/30/2021,11:19,40200,Operational,1.06,0.85,<1,<1,2021
