In [1]:
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName("PySpark_Structured_API").config("spark.driver.memory","4g").config("spark.executor.memory","4g").getOrCreate()


In [27]:
spark

In [28]:
cores=spark._jsc.sc().getExecutorMemoryStatus().keySet().size()

In [29]:
cores

1

In [2]:
from pyspark.sql.functions import *

In [3]:
from pyspark.sql.types import StringType, StructField, StructType, IntegerType

In [4]:
data = [("James","","Smith","36636","M",3000),
    ("Michael","Rose","","40288","M",4000),
    ("Robert","","Williams","42114","M",4000),
    ("Maria","Anne","Jones","39192","F",4000),
    ("Jen","Mary","Brown","","F",-1)
  ]

In [5]:
schema = StructType([
    StructField("First Name", StringType(), True),
    StructField("Middle Name", StringType(), True),
    StructField("Last Name", StringType(), True),
    StructField("Id", StringType(), True),
    StructField("Gender", StringType(), True),
    StructField("Salary", IntegerType(), True)
])

In [6]:
nameList_df = spark.createDataFrame(data=data, schema=schema)

In [7]:
nameList_df.printSchema()

root
 |-- First Name: string (nullable = true)
 |-- Middle Name: string (nullable = true)
 |-- Last Name: string (nullable = true)
 |-- Id: string (nullable = true)
 |-- Gender: string (nullable = true)
 |-- Salary: integer (nullable = true)



In [8]:
nameList_df.show()

+----------+-----------+---------+-----+------+------+
|First Name|Middle Name|Last Name|   Id|Gender|Salary|
+----------+-----------+---------+-----+------+------+
|     James|           |    Smith|36636|     M|  3000|
|   Michael|       Rose|         |40288|     M|  4000|
|    Robert|           | Williams|42114|     M|  4000|
|     Maria|       Anne|    Jones|39192|     F|  4000|
|       Jen|       Mary|    Brown|     |     F|    -1|
+----------+-----------+---------+-----+------+------+



In [9]:
import os
print(os.getcwd())

C:\Learning\Python_Projects\PySpark


In [10]:
ls C:\Learning\Python_Projects\PySpark

 Volume in drive C is Windows
 Volume Serial Number is 2628-209D

 Directory of C:\Learning\Python_Projects\PySpark

12-06-2021  18:19    <DIR>          .
12-06-2021  18:19    <DIR>          ..
09-05-2021  23:40    <DIR>          .ipynb_checkpoints
20-11-2020  22:36       245,147,566 fire-incidents.csv
12-06-2021  17:49    <DIR>          output
09-05-2021  20:10            36,078 persons.json
09-05-2021  22:37    <DIR>          spark-warehouse
12-06-2021  18:19            25,970 StructuredAPI.ipynb
10-06-2021  12:17            39,307 StructuredOperations.ipynb
               4 File(s)    245,248,921 bytes
               5 Dir(s)  44,720,635,904 bytes free


In [11]:
filePath = "./fire-incidents.csv"

In [12]:
fire_incidents_df = (spark.read.format("csv")
                  .option("header", True)
                  .option("inferSchema", True)
                  .load(filePath))

In [13]:
fire_incidents_df1 = fire_incidents_df.select("IncidentNumber", "ExposureNumber", "ID", "ArrivalDtTm").show(10,truncate=False)

+--------------+--------------+---------+-----------------------+
|IncidentNumber|ExposureNumber|ID       |ArrivalDtTm            |
+--------------+--------------+---------+-----------------------+
|20104668      |0             |201046680|2020-09-11T00:58:28.000|
|20104708      |0             |201047080|2020-09-11T06:49:52.000|
|20104648      |0             |201046480|2020-09-10T22:29:12.000|
|20104598      |0             |201045980|2020-09-10T19:02:32.000|
|20104575      |0             |201045750|2020-09-10T18:04:57.000|
|20104477      |0             |201044770|2020-09-10T14:08:18.000|
|20104443      |0             |201044430|2020-09-10T12:35:19.000|
|20104605      |0             |201046050|2020-09-10T19:33:47.000|
|20104474      |0             |201044740|2020-09-10T13:48:52.000|
|20104652      |0             |201046520|2020-09-10T23:07:16.000|
+--------------+--------------+---------+-----------------------+
only showing top 10 rows



In [14]:
fire_incidents_df.printSchema()

root
 |-- IncidentNumber: string (nullable = true)
 |-- ExposureNumber: string (nullable = true)
 |-- ID: string (nullable = true)
 |-- Address: string (nullable = true)
 |-- IncidentDate: string (nullable = true)
 |-- CallNumber: string (nullable = true)
 |-- AlarmDtTm: string (nullable = true)
 |-- ArrivalDtTm: string (nullable = true)
 |-- CloseDtTm: string (nullable = true)
 |-- City: string (nullable = true)
 |-- ZIPCode: string (nullable = true)
 |-- Battalion: string (nullable = true)
 |-- StationArea: string (nullable = true)
 |-- Box: string (nullable = true)
 |-- SuppressionUnits: integer (nullable = true)
 |-- SuppressionPersonnel: integer (nullable = true)
 |-- EMSUnits: integer (nullable = true)
 |-- EMSPersonnel: integer (nullable = true)
 |-- OtherUnits: integer (nullable = true)
 |-- OtherPersonnel: integer (nullable = true)
 |-- FirstUnitOnScene: string (nullable = true)
 |-- EstimatedPropertyLoss: integer (nullable = true)
 |-- EstimatedContentsLoss: string (nullable 

In [15]:
fire_incidents_timestamp_df = fire_incidents_df.withColumn("utc-time", to_utc_timestamp(current_timestamp(),"IST"))

In [16]:
fire_incidents_timestamp_df.select("IncidentNumber","utc-time").show(truncate=False)

+--------------+-----------------------+
|IncidentNumber|utc-time               |
+--------------+-----------------------+
|20104668      |2021-06-13 12:57:19.561|
|20104708      |2021-06-13 12:57:19.561|
|20104648      |2021-06-13 12:57:19.561|
|20104598      |2021-06-13 12:57:19.561|
|20104575      |2021-06-13 12:57:19.561|
|20104477      |2021-06-13 12:57:19.561|
|20104443      |2021-06-13 12:57:19.561|
|20104605      |2021-06-13 12:57:19.561|
|20104474      |2021-06-13 12:57:19.561|
|20104652      |2021-06-13 12:57:19.561|
|20104499      |2021-06-13 12:57:19.561|
|20104412      |2021-06-13 12:57:19.561|
|20104356      |2021-06-13 12:57:19.561|
|20104483      |2021-06-13 12:57:19.561|
|20104639      |2021-06-13 12:57:19.561|
|20104375      |2021-06-13 12:57:19.561|
|20104467      |2021-06-13 12:57:19.561|
|20104542      |2021-06-13 12:57:19.561|
|20104359      |2021-06-13 12:57:19.561|
|20104617      |2021-06-13 12:57:19.561|
+--------------+-----------------------+
only showing top

In [17]:
fire_incidents_timestamp_df.select("IncidentNumber","utc-time").orderBy("utc-time", ascending=False).show(truncate=False)

+--------------+-----------------------+
|IncidentNumber|utc-time               |
+--------------+-----------------------+
|20104668      |2021-06-13 12:57:19.757|
|20104708      |2021-06-13 12:57:19.757|
|20104648      |2021-06-13 12:57:19.757|
|20104598      |2021-06-13 12:57:19.757|
|20104575      |2021-06-13 12:57:19.757|
|20104477      |2021-06-13 12:57:19.757|
|20104443      |2021-06-13 12:57:19.757|
|20104605      |2021-06-13 12:57:19.757|
|20104474      |2021-06-13 12:57:19.757|
|20104652      |2021-06-13 12:57:19.757|
|20104499      |2021-06-13 12:57:19.757|
|20104412      |2021-06-13 12:57:19.757|
|20104356      |2021-06-13 12:57:19.757|
|20104483      |2021-06-13 12:57:19.757|
|20104639      |2021-06-13 12:57:19.757|
|20104375      |2021-06-13 12:57:19.757|
|20104467      |2021-06-13 12:57:19.757|
|20104542      |2021-06-13 12:57:19.757|
|20104359      |2021-06-13 12:57:19.757|
|20104617      |2021-06-13 12:57:19.757|
+--------------+-----------------------+
only showing top

In [33]:
fire_incidents_timestamp_df.select("SuppressionPersonnel").summary("count","min","max").show()

+-------+--------------------+
|summary|SuppressionPersonnel|
+-------+--------------------+
|  count|              534853|
|    min|                   0|
|    max|                5960|
+-------+--------------------+



In [35]:
fire_incident_altered_schema = fire_incidents_timestamp_df.withColumn("IncidentNumberSchemaChange", fire_incidents_timestamp_df['IncidentNumber']
                                       .cast(IntegerType()))

In [40]:
fire_incident_altered_schema.select("IncidentNumberSchemaChange").summary("count","max","min").show()

+-------+--------------------------+
|summary|IncidentNumberSchemaChange|
+-------+--------------------------+
|  count|                    534854|
|    max|                  20104708|
|    min|                   3000001|
+-------+--------------------------+



In [54]:
fire_incident_altered_schema.filter(fire_incident_altered_schema.IncidentNumberSchemaChange>6000001).\
                                select("IncidentNumberSchemaChange").summary("count","max","min").show(truncate=False)

+-------+--------------------------+
|summary|IncidentNumberSchemaChange|
+-------+--------------------------+
|count  |445788                    |
|max    |20104708                  |
|min    |6000005                   |
+-------+--------------------------+



In [58]:
fire_incident_altered_schema.select("ID","Address","IncidentDate").\
where(fire_incident_altered_schema.Address.like("%STREET")).\
show(25,False)

+---------+----------------------+-----------------------+
|ID       |Address               |IncidentDate           |
+---------+----------------------+-----------------------+
|201046680|MARIPOSA STREET       |2020-09-11T00:00:00.000|
|201047080|355 27TH STREET       |2020-09-11T00:00:00.000|
|201046480|2048 POLK STREET      |2020-09-10T00:00:00.000|
|201044770|75 DORE STREET        |2020-09-10T00:00:00.000|
|201046520|758 JACKSON STREET    |2020-09-10T00:00:00.000|
|201044990|3351 23RD STREET      |2020-09-10T00:00:00.000|
|201044120|15TH STREET           |2020-09-10T00:00:00.000|
|201043560|1401 MISSION STREET   |2020-09-10T00:00:00.000|
|201044830|1629 TAYLOR STREET    |2020-09-10T00:00:00.000|
|201046390|705 NATOMA STREET     |2020-09-10T00:00:00.000|
|201043750|1715 MCALLISTER STREET|2020-09-10T00:00:00.000|
|201044670|251-257 KEARNY STREET |2020-09-10T00:00:00.000|
|201045420|125 3RD STREET        |2020-09-10T00:00:00.000|
|201043600|1401 MISSION STREET   |2020-09-10T00:00:00.00

In [70]:
fire_incident_altered_schema.select("Address",fire_incident_altered_schema.Address.substr(0,5)).\
show(25,False)

+-------------------------+------------------------+
|Address                  |substring(Address, 0, 5)|
+-------------------------+------------------------+
|MARIPOSA STREET          |MARIP                   |
|355 27TH STREET          |355 2                   |
|2048 POLK STREET         |2048                    |
|501 COLLEGE AVENUE       |501 C                   |
|289 9TH AVENUE           |289 9                   |
|75 DORE STREET           |75 DO                   |
|550 EL CAMINO DEL MAR    |550 E                   |
|1107 GREAT HY            |1107                    |
|902 CORBETT AVENUE       |902 C                   |
|758 JACKSON STREET       |758 J                   |
|3351 23RD STREET         |3351                    |
|15TH STREET              |15TH                    |
|1401 MISSION STREET      |1401                    |
|1629 TAYLOR STREET       |1629                    |
|705 NATOMA STREET        |705 N                   |
|1715 MCALLISTER STREET   |1715               

In [80]:
fire_incident_altered_schema.select("Address").where(fire_incident_altered_schema.Address.like("%STREET")).\
show(5)

+------------------+
|           Address|
+------------------+
|   MARIPOSA STREET|
|   355 27TH STREET|
|  2048 POLK STREET|
|    75 DORE STREET|
|758 JACKSON STREET|
+------------------+
only showing top 5 rows



In [84]:
new_df = fire_incident_altered_schema.columns[0:5]
fire_incident_altered_schema.select(new_df).show(5,False)

+--------------+--------------+---------+------------------+-----------------------+
|IncidentNumber|ExposureNumber|ID       |Address           |IncidentDate           |
+--------------+--------------+---------+------------------+-----------------------+
|20104668      |0             |201046680|MARIPOSA STREET   |2020-09-11T00:00:00.000|
|20104708      |0             |201047080|355 27TH STREET   |2020-09-11T00:00:00.000|
|20104648      |0             |201046480|2048 POLK STREET  |2020-09-10T00:00:00.000|
|20104598      |0             |201045980|501 COLLEGE AVENUE|2020-09-10T00:00:00.000|
|20104575      |0             |201045750|289 9TH AVENUE    |2020-09-10T00:00:00.000|
+--------------+--------------+---------+------------------+-----------------------+
only showing top 5 rows



In [18]:
ouput_path = "./output/fire-incidents"
fire_incidents_df.write.format("parquet").mode("overwrite").save(ouput_path)

In [19]:
fire_incidents_df.select("IncidentNumber","Address").show(10, truncate=False)

+--------------+---------------------+
|IncidentNumber|Address              |
+--------------+---------------------+
|20104668      |MARIPOSA STREET      |
|20104708      |355 27TH STREET      |
|20104648      |2048 POLK STREET     |
|20104598      |501 COLLEGE AVENUE   |
|20104575      |289 9TH AVENUE       |
|20104477      |75 DORE STREET       |
|20104443      |550 EL CAMINO DEL MAR|
|20104605      |1107 GREAT HY        |
|20104474      |902 CORBETT AVENUE   |
|20104652      |758 JACKSON STREET   |
+--------------+---------------------+
only showing top 10 rows



In [20]:
fire_incidents_df.select("IncidentNumber","Address").withColumnRenamed("Address", "Address Renamed Column").show(10, False)

+--------------+----------------------+
|IncidentNumber|Address Renamed Column|
+--------------+----------------------+
|20104668      |MARIPOSA STREET       |
|20104708      |355 27TH STREET       |
|20104648      |2048 POLK STREET      |
|20104598      |501 COLLEGE AVENUE    |
|20104575      |289 9TH AVENUE        |
|20104477      |75 DORE STREET        |
|20104443      |550 EL CAMINO DEL MAR |
|20104605      |1107 GREAT HY         |
|20104474      |902 CORBETT AVENUE    |
|20104652      |758 JACKSON STREET    |
+--------------+----------------------+
only showing top 10 rows



In [21]:
lst = [("Ram",10,20),("Sai",20,30),("Karthik",30,40),("Ram",10,20),("Karthik",40,50)]

In [22]:
lst_df = spark.createDataFrame(lst, ["name","id","value"])

In [23]:
lst_df.show()

+-------+---+-----+
|   name| id|value|
+-------+---+-----+
|    Ram| 10|   20|
|    Sai| 20|   30|
|Karthik| 30|   40|
|    Ram| 10|   20|
|Karthik| 40|   50|
+-------+---+-----+



In [24]:
lst_df_no_dups= lst_df.drop_duplicates().sort("name",ascending=False)

In [25]:
lst_df.show()

+-------+---+-----+
|   name| id|value|
+-------+---+-----+
|    Ram| 10|   20|
|    Sai| 20|   30|
|Karthik| 30|   40|
|    Ram| 10|   20|
|Karthik| 40|   50|
+-------+---+-----+



In [26]:
lst_df_no_dups.show()

+-------+---+-----+
|   name| id|value|
+-------+---+-----+
|    Sai| 20|   30|
|    Ram| 10|   20|
|Karthik| 40|   50|
|Karthik| 30|   40|
+-------+---+-----+

