In [2]:
from pyspark.sql.types import StructType

In [3]:
pip install delta-spark

Note: you may need to restart the kernel to use updated packages.


In [6]:
from os import PathLike
from hdfs import InsecureClient
from pyspark.sql import SparkSession
from pyspark.sql import Row
from delta import *
from pyspark.sql.types import *
from pyspark.sql.functions import *

# warehouse_location points to the default location for managed databases and tables
warehouse_location = 'hdfs://hdfs-nn:9000/Projeto/Silver'

builder = SparkSession \
    .builder \
    .master("local[2]") \
    .appName("Python Spark DataFrames and SQL") \
    .config("spark.sql.warehouse.dir", warehouse_location) \
    .config("hive.metastore.uris", "thrift://hive-metastore:9083") \
    .config("spark.sql.extensions", "io.delta.sql.DeltaSparkSessionExtension") \
    .config("spark.sql.catalog.spark_catalog", "org.apache.spark.sql.delta.catalog.DeltaCatalog") \
    .config("spark.jars.packages", "io.delta:delta-core_2.12:1.0.0") \
    .enableHiveSupport() \

spark = configure_spark_with_delta_pip(builder).getOrCreate()

In [8]:
 hdfs_path = "hdfs://hdfs-nn:9000/Projeto/Bronze/Parks_Supervisor_Inspections_Inspection_Results.csv"

customSchema = StructType([
    StructField("Inspection_ID", IntegerType(), True),
    StructField("omppropid", StringType(), True),
    StructField("Site_Name", StringType(), True),
    StructField("Boro", StringType(), True),
    StructField("District", StringType(), True),
    StructField("Inspection_Date", StringType(), True),
    StructField("PIP_Category", StringType(), True),
    StructField("Overall_Rating", StringType(), True), 
    StructField("Cleanliness_Rating", StringType(), True)
])
Parks_Supervisor_Inspections_Inspection_Results = spark \
            .read\
            .option("delimiter",",")\
            .option("header","true")\
            .schema(customSchema) \
            .csv(hdfs_path)

In [12]:
 spark.sql(
    """
    DROP DATABASE IF EXISTS ProjetoGold CASCADE
    """
)

DataFrame[]

In [13]:
spark.sql(
    """
    CREATE DATABASE Projeto LOCATION 'hdfs://hdfs-nn:9000/Projeto/Silver/Projeto.db/'
    """
)

DataFrame[]

In [14]:
Alteracao_inspecoes = Parks_Supervisor_Inspections_Inspection_Results

In [15]:
Alteracao1_inspecoes = Alteracao_inspecoes.withColumn('Data', split(Alteracao_inspecoes['Inspection_Date'],' ').getItem(0))

In [16]:
Alteracao2_inspecoes = Alteracao1_inspecoes.withColumn('Year', split(Alteracao1_inspecoes['Data'],'/').getItem(2))

In [17]:
Alteracao_inspecoes = Parks_Supervisor_Inspections_Inspection_Results
Alteracao1_inspecoes = Alteracao_inspecoes.withColumn('Data', split(Alteracao_inspecoes['Inspection_Date'],' ').getItem(0))
Alteracao2_inspecoes = Alteracao1_inspecoes.withColumn('Year', split(Alteracao1_inspecoes['Data'],'/').getItem(2).cast("integer"))

In [18]:
Alteracao3_inspecoes = Alteracao2_inspecoes.withColumn(
    "omppropid",
    when(
        (col("omppropid").isNull()), 
        "Indefinido"
    ).otherwise(col("omppropid")))
    
Alteracao4_inspecoes = Alteracao3_inspecoes.withColumn(
    "Site_Name",
    when(
        (col("Site_Name").isNull()), 
        "Indefinido"
    ).otherwise(col("Site_Name")))
    
Alteracao5_inspecoes = Alteracao4_inspecoes.withColumn(
    "Boro",
    when(
        (col("Boro").isNull()), 
        "Indefinido"
    ).otherwise(col("Boro")))
    
    
Alteracao6_inspecoes = Alteracao5_inspecoes.withColumn(
    "Boro",
    when(Alteracao5_inspecoes.Boro.endswith('B'),regexp_replace(Alteracao5_inspecoes.Boro,'B','Brooklyn')) \
   .when(Alteracao5_inspecoes.Boro.endswith('M'),regexp_replace(Alteracao5_inspecoes.Boro,'M','Manhattan')) \
   .when(Alteracao5_inspecoes.Boro.endswith('Q'),regexp_replace(Alteracao5_inspecoes.Boro,'Q','Queens')) \
   .when(Alteracao5_inspecoes.Boro.endswith('R'),regexp_replace(Alteracao5_inspecoes.Boro,'R','Staten Island')) \
   .when(Alteracao5_inspecoes.Boro.endswith('X'),regexp_replace(Alteracao5_inspecoes.Boro,'X','Bronx')) \
    .when(Alteracao5_inspecoes.Boro.endswith('Indefinido'),regexp_replace(Alteracao5_inspecoes.Boro,'Indefinido','Indefinido')) \
)

Alteracao6_inspecoes.select(col("Boro") == "B").show()
    
Alteracao7_inspecoes = Alteracao6_inspecoes.withColumn(
    "District",
    when(
        (col("District").isNull()), 
        "Indefinido"
    ).otherwise(col("District")))

Alteracao8_inspecoes = Alteracao7_inspecoes.withColumn(
    "PIP_Category",
    when(
        (col("PIP_Category").isNull()), 
        "Indefinido"
    ).otherwise(col("PIP_Category")))

Alteracao9_inspecoes = Alteracao8_inspecoes.withColumn(
    "Overall_Rating",
    when(
        (col("Overall_Rating").isNull()), 
        "Nao Avaliado"
    ).otherwise(col("Overall_Rating")))

Alteracao10_inspecoes = Alteracao9_inspecoes.withColumn(
    "Cleanliness_Rating",
    when(
        (col("Cleanliness_Rating").isNull()), 
        "Nao Avaliado"
    ).otherwise(col("Cleanliness_Rating")))

Alteracao11_inspecoes = Alteracao10_inspecoes.withColumn(
    "Overall_Rating",
    when(Alteracao10_inspecoes.Overall_Rating.endswith('A'),regexp_replace(Alteracao10_inspecoes.Overall_Rating,'A','Acceptable')) \
   .when(Alteracao10_inspecoes.Overall_Rating.endswith('U'),regexp_replace(Alteracao10_inspecoes.Overall_Rating,'U','Unacceptable')) \
    .when(Alteracao10_inspecoes.Overall_Rating.endswith('U/S'),regexp_replace(Alteracao10_inspecoes.Overall_Rating,'U/S','Very Unacceptable')) \
     .when(Alteracao10_inspecoes.Overall_Rating.endswith('Nao Avaliado'),regexp_replace(Alteracao10_inspecoes.Overall_Rating,'Nao Avaliado','Nao Avaliado')) \
)

Alteracao12_inspecoes = Alteracao11_inspecoes.withColumn(
    "Cleanliness_Rating",
    when(Alteracao11_inspecoes.Cleanliness_Rating.endswith('A'),regexp_replace(Alteracao11_inspecoes.Cleanliness_Rating,'A','Acceptable')) \
   .when(Alteracao11_inspecoes.Cleanliness_Rating.endswith('U'),regexp_replace(Alteracao11_inspecoes.Cleanliness_Rating,'U','Unacceptable')) \
    .when(Alteracao11_inspecoes.Cleanliness_Rating.endswith('U/S'),regexp_replace(Alteracao11_inspecoes.Cleanliness_Rating,'U/S','Very Unacceptable')) \
    .when(Alteracao11_inspecoes.Cleanliness_Rating.endswith('Nao Avaliado'),regexp_replace(Alteracao11_inspecoes.Cleanliness_Rating,'Nao Avaliado','Nao Avaliado')) \
)

+----------+
|(Boro = B)|
+----------+
|     false|
|     false|
|     false|
|     false|
|     false|
|     false|
|     false|
|     false|
|     false|
|     false|
|     false|
|     false|
|     false|
|     false|
|     false|
|     false|
|     false|
|     false|
|     false|
|     false|
+----------+
only showing top 20 rows



In [19]:
spark.sql(
    """
    DROP TABLE IF EXISTS Projeto.Inspecoes
    """
)


spark.sql(
    
    
    """
    CREATE EXTERNAL TABLE Projeto.Inspecoes (
    
        Inspection_ID INT,
        omppropid string,
        Site_Name string,
        Boro string,
        District string,
        Inspection_Date string,
        Overall_Rating string,
        Cleanliness_Rating string,
        Data string
        )
        
        USING DELTA
        PARTITIONED BY(
        Year int,
        PIP_Category string
         
        

    )
    
    
    LOCATION 'hdfs://hdfs-nn:9000/Projeto/Silver/Projeto.db/Inspecoes'
    """
    
)

DataFrame[]

In [20]:
#write df to hive deltalake_table

Alteracao12_inspecoes \
    .select("Inspection_ID", "omppropid", "Site_Name", "Boro", "District", "Inspection_Date", "PIP_Category", "Overall_Rating", "Cleanliness_Rating", "Data", "Year") \
    .write \
    .mode("overwrite") \
    .partitionBy("Year", "PIP_Category") \
    .format("delta") \
    .save("hdfs://hdfs-nn:9000/Projeto/Silver/Projeto.db/Inspecoes/")

In [22]:
Alteracao12_inspecoes.filter(Alteracao12_inspecoes.Overall_Rating == 'Unacceptable').show()

+-------------+---------+--------------------+---------+--------+--------------------+------------+--------------+------------------+----------+----+
|Inspection_ID|omppropid|           Site_Name|     Boro|District|     Inspection_Date|PIP_Category|Overall_Rating|Cleanliness_Rating|      Data|Year|
+-------------+---------+--------------------+---------+--------+--------------------+------------+--------------+------------------+----------+----+
|        38296|     M015|       Columbus Park|Manhattan|    M-01|08/23/2015 12:00:...|  Playground|  Unacceptable|      Unacceptable|08/23/2015|2015|
|        38327|     M086|   Stuyvesant Square|Manhattan|    M-06|08/20/2015 12:00:...|  Small Park|  Unacceptable|        Acceptable|08/20/2015|2015|
|        38486|  M140-01| Ten Mile River Plgd|Manhattan|    M-14|08/24/2015 12:00:...|  Playground|  Unacceptable|        Acceptable|08/24/2015|2015|
|        38515|     M158|Robert Moses Play...|Manhattan|    M-06|08/24/2015 12:00:...|  Playground| 