### Import the Required classes of the spark

In [1]:
import warnings
warnings.filterwarnings('ignore')

In [3]:
from pyspark.sql import SparkSession

In [5]:
spark = SparkSession.builder.appName('DataFrameFromFile').getOrCreate()

23/07/02 15:57:17 WARN Utils: Your hostname, systemd resolves to a loopback address: 127.0.1.1; using 192.168.0.141 instead (on interface wlp3s0)
23/07/02 15:57:17 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
23/07/02 15:57:17 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


### Creating DataFrame from Files

In [7]:
filepath = 'data/fire-incidents.csv'

In [8]:
read_csv = spark.read.format('csv')
read_csv = read_csv.option('header', True)
read_csv = read_csv.option( 'inferSchema', True)

fire_df = read_csv.load(filepath)

                                                                                

In [9]:
fire_df.printSchema()

root
 |-- IncidentNumber: integer (nullable = true)
 |-- ExposureNumber: integer (nullable = true)
 |-- ID: integer (nullable = true)
 |-- Address: string (nullable = true)
 |-- IncidentDate: timestamp (nullable = true)
 |-- CallNumber: integer (nullable = true)
 |-- AlarmDtTm: timestamp (nullable = true)
 |-- ArrivalDtTm: timestamp (nullable = true)
 |-- CloseDtTm: timestamp (nullable = true)
 |-- City: string (nullable = true)
 |-- ZIPCode: string (nullable = true)
 |-- Battalion: string (nullable = true)
 |-- StationArea: string (nullable = true)
 |-- Box: string (nullable = true)
 |-- SuppressionUnits: integer (nullable = true)
 |-- SuppressionPersonnel: integer (nullable = true)
 |-- EMSUnits: integer (nullable = true)
 |-- EMSPersonnel: integer (nullable = true)
 |-- OtherUnits: integer (nullable = true)
 |-- OtherPersonnel: integer (nullable = true)
 |-- FirstUnitOnScene: string (nullable = true)
 |-- EstimatedPropertyLoss: integer (nullable = true)
 |-- EstimatedContentsLoss: d

In [10]:
fire_df.select('IncidentNumber','IncidentDate','City').first()

Row(IncidentNumber=20104668, IncidentDate=datetime.datetime(2020, 9, 11, 0, 0), City='San Francisco')

In [11]:
fire_df.select('IncidentNumber','IncidentDate','City').show(10)

+--------------+-------------------+-------------+
|IncidentNumber|       IncidentDate|         City|
+--------------+-------------------+-------------+
|      20104668|2020-09-11 00:00:00|San Francisco|
|      20104708|2020-09-11 00:00:00|San Francisco|
|      20104648|2020-09-10 00:00:00|San Francisco|
|      20104598|2020-09-10 00:00:00|San Francisco|
|      20104575|2020-09-10 00:00:00|San Francisco|
|      20104477|2020-09-10 00:00:00|San Francisco|
|      20104443|2020-09-10 00:00:00|San Francisco|
|      20104605|2020-09-10 00:00:00|San Francisco|
|      20104474|2020-09-10 00:00:00|San Francisco|
|      20104652|2020-09-10 00:00:00|San Francisco|
+--------------+-------------------+-------------+
only showing top 10 rows



### Save the DataFrame as parquet format

In [16]:
output_path_parquet = './data/output/fireincidents/parquet_format'

output_path_json = './data/output/fireincidents/parquet_json'


`Parquet` is a columnar storage file format used in big data processing frameworks like Apache Spark and Apache Hadoop. It efficiently stores and compresses data, making it suitable for large-scale analytics workloads. It supports efficient predicate pushdown, schema evolution, and high-performance columnar scans

In [19]:
# save the dataframe in the parquete file format

fire_df.write.format('parquet').mode('overwrite').save(output_path_parquet)

23/07/02 16:04:43 WARN MemoryManager: Total allocation exceeds 95.00% (1,020,054,720 bytes) of heap memory
Scaling row group sizes to 95.00% for 8 writers
23/07/02 16:04:43 WARN MemoryManager: Total allocation exceeds 95.00% (1,020,054,720 bytes) of heap memory
Scaling row group sizes to 84.44% for 9 writers
23/07/02 16:04:43 WARN MemoryManager: Total allocation exceeds 95.00% (1,020,054,720 bytes) of heap memory
Scaling row group sizes to 76.00% for 10 writers
23/07/02 16:04:43 WARN MemoryManager: Total allocation exceeds 95.00% (1,020,054,720 bytes) of heap memory
Scaling row group sizes to 69.09% for 11 writers
23/07/02 16:04:43 WARN MemoryManager: Total allocation exceeds 95.00% (1,020,054,720 bytes) of heap memory
Scaling row group sizes to 63.33% for 12 writers
23/07/02 16:04:43 WARN MemoryManager: Total allocation exceeds 95.00% (1,020,054,720 bytes) of heap memory
Scaling row group sizes to 58.46% for 13 writers
23/07/02 16:04:43 WARN MemoryManager: Total allocation exceeds 95.

In [20]:
# save the dataframe in the json file format

fire_df.write.format('json').mode('overwrite').save(output_path_json)

                                                                                

The end of the document