In [1]:
from os import PathLike
from hdfs import InsecureClient
from pyspark.sql import SparkSession
from pyspark.sql import Row
from delta import *
from pyspark.sql.types import *
from pyspark.sql.functions import *

warehouse_location = 'hdfs://hdfs-nn:9870/trabalhoarvores'

builder = SparkSession \
    .builder \
    .master("local[2]") \
    .appName("Python Spark DataFrames and SQL") \
    .config("spark.sql.warehouse.dir", warehouse_location) \
    .config("hive.metastore.uris", "thrift://hive-metastore:9083") \
    .config("spark.sql.extensions", "io.delta.sql.DeltaSparkSessionExtension") \
    .config("spark.sql.catalog.spark_catalog", "org.apache.spark.sql.delta.catalog.DeltaCatalog") \
    .config("spark.jars.packages", "io.delta:delta-core_2.12:1.0.0") \
    .enableHiveSupport() \

spark = configure_spark_with_delta_pip(builder).getOrCreate()

In [2]:
hdfs_path = "hdfs://hdfs-nn:9000/trabalhoarvores/bronze/new_york_tree_census_2015.csv"

customSchema = StructType([
    StructField("tree_id", IntegerType(), True),        
    StructField("block_id", IntegerType(), True),
    StructField("created_at", StringType(), True),
    StructField("tree_dbh", DateType(), True),
    StructField("stump_diam", IntegerType(), True),
    StructField("curb_loc", StringType(), True),
    StructField("status", StringType(), True),
    StructField("health", StringType(), True),
    StructField("spc_latin", StringType(), True),
    StructField("spc_common", StringType(), True),
    StructField("steward", StringType(), True),
    StructField("guards", StringType(), True),
    StructField("sidewalk", StringType(), True),
    StructField("user_type", StringType(), True),
    StructField("problems", StringType(), True),
    StructField("root_stone", StringType(), True),
    StructField("root_grate", StringType(), True),
    StructField("root_other", StringType(), True),
    StructField("trunk_wire", StringType(), True),
    StructField("trnk_light", StringType(), True),
    StructField("trnk_other", StringType(), True),
    StructField("brch_light", StringType(), True),
    StructField("brch_shoe", StringType(), True),
    StructField("brch_other", StringType(), True),
    StructField("address", StringType(), True),
    StructField("zipcode", IntegerType(), True),
    StructField("zip_city", StringType(), True),
    StructField("cb_num", IntegerType(), True),
    StructField("borocode", IntegerType(), True),
    StructField("boroname", StringType(), True),
    StructField("cncldist", IntegerType(), True),
    StructField("st_assem", IntegerType(), True),
    StructField("st_senate", IntegerType(), True),
    StructField("nta", StringType(), True),
    StructField("nta_name", StringType(), True),
    StructField("boro_ct", StringType(), True),
    StructField("state", StringType(), True),
    StructField("latitude", StringType(), True),
    StructField("longitude", StringType(), True),
    StructField("x_sp", StringType(), True),
    StructField("y_sp", StringType(), True)
])
tree_census = spark \
            .read\
            .option("delimiter",";")\
            .option("header","true")\
            .schema(customSchema) \
            .csv(hdfs_path)
tree_census.toPandas()

Unnamed: 0,tree_id,block_id,created_at,tree_dbh,stump_diam,curb_loc,status,health,spc_latin,spc_common,...,st_assem,st_senate,nta,nta_name,boro_ct,state,latitude,longitude,x_sp,y_sp
0,606945,305778,28-06-2016,,0,OnCurb,Alive,Good,Fraxinus pennsylvanica,green ash,...,25,14,QN37,Kew Gardens Hills,4125700,New York,4.072.433.932,-7.380.518.011,1.038.250.055,2.032.329.417
1,160321,341273,19-08-2015,,0,OnCurb,Alive,Good,Gleditsia triacanthos var. inermis,honeylocust,...,34,13,QN28,Jackson Heights,4030902,New York,4.075.662.595,-7.389.416.679,1.013.570.588,2.149.536.472
2,541347,325281,30-12-2015,,0,OnCurb,Alive,Good,Pyrus calleryana,Callery pear,...,32,10,QN76,Baisley Park,4028800,New York,4.067.977.677,-7.378.846.289,1.042.922.921,1.870.082.671
3,613930,203822,05-07-2016,,0,OnCurb,Alive,Good,Pyrus calleryana,Callery pear,...,46,22,BK31,Bay Ridge,3005000,New York,4.062.274.274,-740.375.434,9.738.278.722,1.661.605.847
4,18353,338911,13-06-2015,,0,OnCurb,Alive,Good,Prunus virginiana,'Schubert' chokecherry,...,31,10,QN12,Hammels-Arverne-Edgemere,4095400,New York,405.965.141,-7.379.762.248,1.040.452.463,1.566.675.017
...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...
683783,237788,223344,19-09-2015,,0,OnCurb,Alive,Poor,Prunus cerasifera,purple-leaf plum,...,51,25,BK33,Carroll Gardens-Columbia Street-Red Hook,3005300,New York,4.067.256.565,-7.401.147.322,9.810.673.856,1.843.104.162
683784,249489,335314,23-09-2015,,0,OnCurb,Dead,,,,...,33,11,QN44,Glen Oaks-Floral Park-New Hyde Park,4157903,New York,4.073.043.381,-737.106.004,1.064.458.182,2.055.257.957
683785,230261,230303,16-09-2015,,0,OnCurb,Dead,,,,...,44,17,BK42,Flatbush,3048200,New York,4.063.388.966,-7.396.977.907,9.926.379.934,1.702.209.185
683786,623784,318368,12-07-2016,,0,OnCurb,Alive,Good,Quercus rubra,northern red oak,...,31,10,QN55,South Ozone Park,4017800,New York,4.067.618.954,-7.381.313.534,1.036.082.454,1.856.857.796


In [3]:
tree_census1 = tree_census.withColumn(
    "health",
    when(
        (col("health").isNull()),
        "Desconhecida"
    ).otherwise(col("health")))
tree_census1.toPandas()

Unnamed: 0,tree_id,block_id,created_at,tree_dbh,stump_diam,curb_loc,status,health,spc_latin,spc_common,...,st_assem,st_senate,nta,nta_name,boro_ct,state,latitude,longitude,x_sp,y_sp
0,606945,305778,28-06-2016,,0,OnCurb,Alive,Good,Fraxinus pennsylvanica,green ash,...,25,14,QN37,Kew Gardens Hills,4125700,New York,4.072.433.932,-7.380.518.011,1.038.250.055,2.032.329.417
1,160321,341273,19-08-2015,,0,OnCurb,Alive,Good,Gleditsia triacanthos var. inermis,honeylocust,...,34,13,QN28,Jackson Heights,4030902,New York,4.075.662.595,-7.389.416.679,1.013.570.588,2.149.536.472
2,541347,325281,30-12-2015,,0,OnCurb,Alive,Good,Pyrus calleryana,Callery pear,...,32,10,QN76,Baisley Park,4028800,New York,4.067.977.677,-7.378.846.289,1.042.922.921,1.870.082.671
3,613930,203822,05-07-2016,,0,OnCurb,Alive,Good,Pyrus calleryana,Callery pear,...,46,22,BK31,Bay Ridge,3005000,New York,4.062.274.274,-740.375.434,9.738.278.722,1.661.605.847
4,18353,338911,13-06-2015,,0,OnCurb,Alive,Good,Prunus virginiana,'Schubert' chokecherry,...,31,10,QN12,Hammels-Arverne-Edgemere,4095400,New York,405.965.141,-7.379.762.248,1.040.452.463,1.566.675.017
...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...
683783,237788,223344,19-09-2015,,0,OnCurb,Alive,Poor,Prunus cerasifera,purple-leaf plum,...,51,25,BK33,Carroll Gardens-Columbia Street-Red Hook,3005300,New York,4.067.256.565,-7.401.147.322,9.810.673.856,1.843.104.162
683784,249489,335314,23-09-2015,,0,OnCurb,Dead,Desconhecida,,,...,33,11,QN44,Glen Oaks-Floral Park-New Hyde Park,4157903,New York,4.073.043.381,-737.106.004,1.064.458.182,2.055.257.957
683785,230261,230303,16-09-2015,,0,OnCurb,Dead,Desconhecida,,,...,44,17,BK42,Flatbush,3048200,New York,4.063.388.966,-7.396.977.907,9.926.379.934,1.702.209.185
683786,623784,318368,12-07-2016,,0,OnCurb,Alive,Good,Quercus rubra,northern red oak,...,31,10,QN55,South Ozone Park,4017800,New York,4.067.618.954,-7.381.313.534,1.036.082.454,1.856.857.796


In [4]:
tree_census2 = tree_census1.withColumn(
    "spc_latin",
    when(
        (col("spc_latin").isNull()),
        "Desconhecido"
    ).otherwise(col("spc_latin")))

In [5]:
tree_census3 = tree_census2.withColumn(
    "spc_common",
    when(
        (col("spc_common").isNull()),
        "Desconhecido"
    ).otherwise(col("spc_common")))

In [6]:
tree_census4 = tree_census3.withColumn(
    "steward",
    when(
        (col("steward").isNull()),
        "Desconhecido"
    ).otherwise(col("steward")))

In [7]:
tree_census5 = tree_census4.withColumn(
    "guards",
    when(
        (col("guards").isNull()),
        "Desconhecido"
    ).otherwise(col("guards")))

In [8]:
tree_census6 = tree_census5.withColumn(
    "sidewalk",
    when(
        (col("sidewalk").isNull()),
        "Desconhecido"
    ).otherwise(col("sidewalk")))

In [9]:
tree_census7 = tree_census6.withColumn(
    "problems",
    when(
        (col("problems").isNull()),
        "Desconhecido"
    ).otherwise(col("problems")))


In [10]:
tree_census8 = tree_census7.withColumn('mes_registo', split(tree_census7['created_at'], '-').getItem(1))
tree_census8.printSchema()

root
 |-- tree_id: integer (nullable = true)
 |-- block_id: integer (nullable = true)
 |-- created_at: string (nullable = true)
 |-- tree_dbh: date (nullable = true)
 |-- stump_diam: integer (nullable = true)
 |-- curb_loc: string (nullable = true)
 |-- status: string (nullable = true)
 |-- health: string (nullable = true)
 |-- spc_latin: string (nullable = true)
 |-- spc_common: string (nullable = true)
 |-- steward: string (nullable = true)
 |-- guards: string (nullable = true)
 |-- sidewalk: string (nullable = true)
 |-- user_type: string (nullable = true)
 |-- problems: string (nullable = true)
 |-- root_stone: string (nullable = true)
 |-- root_grate: string (nullable = true)
 |-- root_other: string (nullable = true)
 |-- trunk_wire: string (nullable = true)
 |-- trnk_light: string (nullable = true)
 |-- trnk_other: string (nullable = true)
 |-- brch_light: string (nullable = true)
 |-- brch_shoe: string (nullable = true)
 |-- brch_other: string (nullable = true)
 |-- address: str

In [11]:
spark.sql(
    """
    DROP TABLE IF EXISTS Projeto.new_york_tree_census_2015
    """
)

spark.sql(
    """
    DROP DATABASE IF EXISTS Projeto CASCADE
    """
)

DataFrame[]

In [12]:
spark.sql(
    """
    create database Projeto location 'hdfs://hdfs-nn:9000/trabalhoarvores/silver/Projeto.db'
    """
)

DataFrame[]

In [13]:
spark.sql(
    """
    DROP TABLE IF EXISTS Projeto.new_york_tree_census_2015
    """
)

spark.sql(
    """
     CREATE EXTERNAL TABLE Projeto.new_york_tree_census_2015 (
        tree_id INT,
        block_id INT,
        created_at TIMESTAMP,
        tree_dbh INT,
        stump_diam INT,
        curb_loc VARCHAR(50),
        status VARCHAR(50),
        health VARCHAR(50),
        spc_latin VARCHAR(50),
        spc_common VARCHAR(50),
        steward VARCHAR(50),
        guards VARCHAR(50),
        sidewalk VARCHAR(50),
        user_type VARCHAR(50),
        problems VARCHAR(50),
        root_stone VARCHAR(50),
        root_grate VARCHAR(50),
        root_other VARCHAR(50),
        trunk_wire VARCHAR(50),
        trnk_light VARCHAR(50),
        trnk_other VARCHAR(50),
        brch_light VARCHAR(50),
        brch_shoe VARCHAR(50),
        brch_other VARCHAR(50),
        address VARCHAR(50),
        zipcode INT,
        zip_city VARCHAR(50),
        cb_num INT,
        cncldist INT,
        st_assem INT,
        st_senate INT,
        nta VARCHAR(50),
        nta_name VARCHAR(50),
        boro_ct VARCHAR(50),
        state VARCHAR(50),
        latitude DOUBLE,
        longitude DOUBLE,
        x_sp DOUBLE,
        y_sp DOUBLE,
        mes_registo VARCHAR(50)
        
    )
    USING DELTA
    PARTITIONED BY (
        boroname VARCHAR(50)
    )
    LOCATION 'hdfs://hdfs-nn:9000/trabalhoarvores/silver/Projeto.db/new_york_tree_census_2015'
    """
)

DataFrame[]

In [14]:
tree_census8 \
    .select("tree_id", "block_id", "created_at", "tree_dbh", "stump_diam", "curb_loc", "status", "health", "spc_latin", "spc_common", "steward", 
            "guards", "sidewalk", "user_type", "problems", "root_stone", "root_grate", "root_other", "trunk_wire", "trnk_light", "trnk_other", "brch_light",
           "brch_shoe", "brch_other", "address", "zipcode", "zip_city", "cb_num", "borocode", "boroname", "cncldist", "st_assem", "st_senate", "nta",
           "nta_name", "boro_ct", "state", "latitude", "longitude", "x_sp", "y_sp", "mes_registo") \
    .write \
    .mode("overwrite") \
    .partitionBy("boroname") \
    .format("delta") \
    .save("hdfs://hdfs-nn:9000/trabalhoarvores/silver/Projeto.db/new_york_tree_census_2015/deltalake_table/")