### Command to print current working directory

In [30]:
import os
os.getcwd()

'E:\\Machine Learning A-Z_Download Codes and Datasets\\ML_Git'

### Command to move and copy files from one location to another location

In a raw string, backslashes (\) are treated as literal characters and are not interpreted as escape characters (such as \n for newline or \t for tab)

In [None]:
import shutil
shutil.move(r'E:\\Machine Learning A-Z_Download Codes and Datasets\\AWS\pyspark_intro.ipynb'\
            ,r'E:\\Machine Learning A-Z_Download Codes and Datasets\\ML_Git\pyspark_intro.ipynb')
#shutil.copy(src_path,dst_path)

### Pyspark Introduction

PySpark is a versatile, high-performance tool for anyone working with big data, offering a unified platform for data processing, analytics, machine learning, and real-time applications—all accessible through Python

Key Uses of PySpark

Big Data Processing:
PySpark excels at processing and transforming large volumes of structured, semi-structured, or unstructured data. It can handle data at any scale, from a single workstation to thousands of nodes.

Data Analysis:
You can perform SQL-like queries using PySpark SQL, which allows for interactive data exploration and analysis. PySpark DataFrames provide a familiar tabular interface, similar to Pandas or R, but with distributed computing power.

Machine Learning:
PySpark integrates with Spark MLlib, allowing users to build, train, and tune machine learning models on massive datasets. This includes tasks like regression, classification, clustering, and recommendation systems.

Streaming and Real-Time Analytics:
With Spark Streaming, PySpark can process real-time data streams, making it suitable for use cases like fraud detection, monitoring, and alerting.

Graph Processing:
PySpark supports graph analytics through libraries like GraphFrames, enabling efficient analysis of large-scale graph data (e.g., social networks, recommendation engines).

ETL (Extract, Transform, Load):
PySpark is often used to build ETL pipelines for data warehousing and data lake solutions. It supports reading from and writing to various data sources and formats, including CSV, Parquet, JSON, and Hive tables.

Integration with Other Tools:
PySpark can be used alongside popular Python libraries such as Pandas, NumPy, and scikit-learn, and integrates well with Jupyter Notebooks for interactive development.

Why Use PySpark?
Scalability:
Handles petabyte-scale data efficiently, far beyond the limits of single-machine tools.

Speed:
In-memory computation and optimized execution plans make PySpark significantly faster than disk-based frameworks like Hadoop MapReduce.

Fault Tolerance:
Built-in mechanisms for recovering from failures ensure reliability in distributed environments.

Flexibility:
Supports batch and streaming data, structured and unstructured formats, and a wide range of analytics and machine learning tasks

Installing Pyspark in Windows
1. Created Python virtual environment named 'my_venv'  cmd: python -m venv my_venv
2. activate the environment cmd: .\my_venv\Scripts\activate
3. install pyspark cmd: pip install pyspark
4. install terminal for pyspark cmd: pip install ipykernel
5. install the kernel cmd: python -m ipykernel install --user --name PYSPARK_KERNEL

In [31]:
import pyspark
from pyspark.sql import SparkSession
# if got the error of PySparkRuntimeError: [JAVA_GATEWAY_EXITED] Java gateway process exited before sending its port number. 
import os
os.environ['JAVA_HOME'] = r'C:\Users\sainadh\my_venv\Java\jdk-21'

In [32]:
from pyspark.sql.types import *
from pyspark.sql.functions import *

In [33]:
# initializing spark session
spark=SparkSession.builder.getOrCreate()

file_path=os.getcwd()+r'\BigMart_Sales.csv'


In [34]:
# read data into pyspark with different file formats
df1 = spark.read.format('csv')\
                .option('infershcema',True)\
                .option('header',True)\
                .load(file_path)
# inferSchema=True tells PySpark to automatically detect (infer) the data types of each column in your CSV file.
df1.printSchema()

root
 |-- Item_Identifier: string (nullable = true)
 |-- Item_Weight: string (nullable = true)
 |-- Item_Fat_Content: string (nullable = true)
 |-- Item_Visibility: string (nullable = true)
 |-- Item_Type: string (nullable = true)
 |-- Item_MRP: string (nullable = true)
 |-- Outlet_Identifier: string (nullable = true)
 |-- Outlet_Establishment_Year: string (nullable = true)
 |-- Outlet_Size: string (nullable = true)
 |-- Outlet_Location_Type: string (nullable = true)
 |-- Outlet_Type: string (nullable = true)
 |-- Item_Outlet_Sales: string (nullable = true)



In [35]:
# simple code to read the csv data
df = spark.read.csv(file_path,header=True)
df.printSchema() # returns schema of the dataframe

root
 |-- Item_Identifier: string (nullable = true)
 |-- Item_Weight: string (nullable = true)
 |-- Item_Fat_Content: string (nullable = true)
 |-- Item_Visibility: string (nullable = true)
 |-- Item_Type: string (nullable = true)
 |-- Item_MRP: string (nullable = true)
 |-- Outlet_Identifier: string (nullable = true)
 |-- Outlet_Establishment_Year: string (nullable = true)
 |-- Outlet_Size: string (nullable = true)
 |-- Outlet_Location_Type: string (nullable = true)
 |-- Outlet_Type: string (nullable = true)
 |-- Item_Outlet_Sales: string (nullable = true)



In [36]:
# n- no.of rows to be  printed and vertical- Each row will be printed in vertical fashion
df.show(n=1, truncate=False, vertical=True) 

-RECORD 0--------------------------------------
 Item_Identifier           | FDA15             
 Item_Weight               | 9.3               
 Item_Fat_Content          | Low Fat           
 Item_Visibility           | 0.016047301       
 Item_Type                 | Dairy             
 Item_MRP                  | 249.8092          
 Outlet_Identifier         | OUT049            
 Outlet_Establishment_Year | 1999              
 Outlet_Size               | Medium            
 Outlet_Location_Type      | Tier 1            
 Outlet_Type               | Supermarket Type1 
 Item_Outlet_Sales         | 3735.138          
only showing top 1 row


In [37]:
# convert a small DataFrame to Pandas for richer display
import pandas as pd
df.limit(10).toPandas()

Unnamed: 0,Item_Identifier,Item_Weight,Item_Fat_Content,Item_Visibility,Item_Type,Item_MRP,Outlet_Identifier,Outlet_Establishment_Year,Outlet_Size,Outlet_Location_Type,Outlet_Type,Item_Outlet_Sales
0,FDA15,9.3,Low Fat,0.016047301,Dairy,249.8092,OUT049,1999,Medium,Tier 1,Supermarket Type1,3735.138
1,DRC01,5.92,Regular,0.019278216,Soft Drinks,48.2692,OUT018,2009,Medium,Tier 3,Supermarket Type2,443.4228
2,FDN15,17.5,Low Fat,0.016760075,Meat,141.618,OUT049,1999,Medium,Tier 1,Supermarket Type1,2097.27
3,FDX07,19.2,Regular,0.0,Fruits and Vegetables,182.095,OUT010,1998,,Tier 3,Grocery Store,732.38
4,NCD19,8.93,Low Fat,0.0,Household,53.8614,OUT013,1987,High,Tier 3,Supermarket Type1,994.7052
5,FDP36,10.395,Regular,0.0,Baking Goods,51.4008,OUT018,2009,Medium,Tier 3,Supermarket Type2,556.6088
6,FDO10,13.65,Regular,0.012741089,Snack Foods,57.6588,OUT013,1987,High,Tier 3,Supermarket Type1,343.5528
7,FDP10,,Low Fat,0.127469857,Snack Foods,107.7622,OUT027,1985,Medium,Tier 3,Supermarket Type3,4022.7636
8,FDH17,16.2,Regular,0.016687114,Frozen Foods,96.9726,OUT045,2002,,Tier 2,Supermarket Type1,1076.5986
9,FDU28,19.2,Regular,0.09444959,Frozen Foods,187.8214,OUT017,2007,,Tier 2,Supermarket Type1,4710.535


df.display() --> this command is used for rich display of df same like Pandas but it's only works with databricks application

In [38]:
# getting only few rows of the dataframe
df['Item_Identifier','Item_Weight','Item_Fat_Content'].show(n=5)

+---------------+-----------+----------------+
|Item_Identifier|Item_Weight|Item_Fat_Content|
+---------------+-----------+----------------+
|          FDA15|        9.3|         Low Fat|
|          DRC01|       5.92|         Regular|
|          FDN15|       17.5|         Low Fat|
|          FDX07|       19.2|         Regular|
|          NCD19|       8.93|         Low Fat|
+---------------+-----------+----------------+
only showing top 5 rows


### Schme Definitions
Manually defining data types of each columns in the dataframe
2 ways 1. DDL Schema 2. StructType() Schema

In [39]:
# DDL schema: we need to define for every column of the dataframe if we define for only one column it'll throw error as below 
# ParseException: [PARSE_SYNTAX_ERROR] Syntax error at or near end of input. SQLSTATE: 42601 (line 4, pos 0) 
my_ddl_schema= '''
                    Item_Identifier STRING,
                    Item_Weight STRING,
                    Item_Fat_Content STRING, 
                    Item_Visibility DOUBLE,
                    Item_Type STRING,
                    Item_MRP DOUBLE,
                    Outlet_Identifier STRING,
                    Outlet_Establishment_Year INT,
                    Outlet_Size STRING,
                    Outlet_Location_Type STRING, 
                    Outlet_Type STRING,
                    Item_Outlet_Sales DOUBLE
'''
df = spark.read.csv(file_path,header=True,schema=my_ddl_schema)
df.printSchema()

df.show(n=1,truncate=False,vertical=True)

root
 |-- Item_Identifier: string (nullable = true)
 |-- Item_Weight: string (nullable = true)
 |-- Item_Fat_Content: string (nullable = true)
 |-- Item_Visibility: double (nullable = true)
 |-- Item_Type: string (nullable = true)
 |-- Item_MRP: double (nullable = true)
 |-- Outlet_Identifier: string (nullable = true)
 |-- Outlet_Establishment_Year: integer (nullable = true)
 |-- Outlet_Size: string (nullable = true)
 |-- Outlet_Location_Type: string (nullable = true)
 |-- Outlet_Type: string (nullable = true)
 |-- Item_Outlet_Sales: double (nullable = true)

-RECORD 0--------------------------------------
 Item_Identifier           | FDA15             
 Item_Weight               | 9.3               
 Item_Fat_Content          | Low Fat           
 Item_Visibility           | 0.016047301       
 Item_Type                 | Dairy             
 Item_MRP                  | 249.8092          
 Outlet_Identifier         | OUT049            
 Outlet_Establishment_Year | 1999              
 O

In [40]:
# structtype function contains structfield function, we need to define inside this (col_name,datatyoe as function,nullable=True/False)
# nullable means the column can have null values or not
my_struct_schema = StructType([ StructField('Item_Identifier',StringType(),True),
                              StructField('Item_Weight',DoubleType(),True), 
                              StructField('Item_Fat_Content',StringType(),True), 
                              StructField('Item_Visibility',StringType(),True), 
                              StructField('Item_MRP',StringType(),True), 
                              StructField('Outlet_Identifier',StringType(),True), 
                              StructField('Outlet_Establishment_Year',StringType(),True), 
                              StructField('Outlet_Size',StringType(),True), 
                              StructField('Outlet_Location_Type',StringType(),True), 
                              StructField('Outlet_Type',StringType(),True), 
                              StructField('Item_Outlet_Sales',StringType(),True)

])
# command to read data with customschema
df = spark.read.format('csv')\
                .option('header',True)\
                .schema(my_struct_schema)\
                .load(file_path)
df.printSchema()
df.show(n=1,vertical=True)

root
 |-- Item_Identifier: string (nullable = true)
 |-- Item_Weight: double (nullable = true)
 |-- Item_Fat_Content: string (nullable = true)
 |-- Item_Visibility: string (nullable = true)
 |-- Item_MRP: string (nullable = true)
 |-- Outlet_Identifier: string (nullable = true)
 |-- Outlet_Establishment_Year: string (nullable = true)
 |-- Outlet_Size: string (nullable = true)
 |-- Outlet_Location_Type: string (nullable = true)
 |-- Outlet_Type: string (nullable = true)
 |-- Item_Outlet_Sales: string (nullable = true)

-RECORD 0--------------------------------------
 Item_Identifier           | FDA15             
 Item_Weight               | 9.3               
 Item_Fat_Content          | Low Fat           
 Item_Visibility           | 0.016047301       
 Item_MRP                  | Dairy             
 Outlet_Identifier         | 249.8092          
 Outlet_Establishment_Year | OUT049            
 Outlet_Size               | 1999              
 Outlet_Location_Type      | Medium         

In [41]:
# if you try to define schema for a single column  it won't throw an error instead 
# it'll select the first column even though if we mention another column name and changes it's name and data as an example below
my_struct_schema=StructType([StructField('Item_Weight',StringType(),True)])
df1= spark.read.csv(file_path,header=True,schema=my_struct_schema)
df1.printSchema()
df1.show(n=1)

root
 |-- Item_Weight: string (nullable = true)

+-----------+
|Item_Weight|
+-----------+
|      FDA15|
+-----------+
only showing top 1 row


### Fetching few columns from the dataframe

In [42]:
# by using select function
df.select('Item_Identifier','Item_Weight','Item_Fat_Content').show(n=5)
# by simply performing slicing operation
df['Item_Identifier','Item_Weight','Item_Fat_Content'].show(n=5)

+---------------+-----------+----------------+
|Item_Identifier|Item_Weight|Item_Fat_Content|
+---------------+-----------+----------------+
|          FDA15|        9.3|         Low Fat|
|          DRC01|       5.92|         Regular|
|          FDN15|       17.5|         Low Fat|
|          FDX07|       19.2|         Regular|
|          NCD19|       8.93|         Low Fat|
+---------------+-----------+----------------+
only showing top 5 rows
+---------------+-----------+----------------+
|Item_Identifier|Item_Weight|Item_Fat_Content|
+---------------+-----------+----------------+
|          FDA15|        9.3|         Low Fat|
|          DRC01|       5.92|         Regular|
|          FDN15|       17.5|         Low Fat|
|          FDX07|       19.2|         Regular|
|          NCD19|       8.93|         Low Fat|
+---------------+-----------+----------------+
only showing top 5 rows


### Data Writing/Exporting the data

In [43]:
'''Py4JJavaError: An error occurred while calling o370.save.
: java.lang.RuntimeException: java.io.FileNotFoundException: java.io.FileNotFoundException: HADOOP_HOME and hadoop.home.dir are unset. -see https://cwiki.apache.org/confluence/display/HADOOP2/WindowsProblems

it'll save as a folder of files with the output file and metadata as well
'''
os.environ['HADOOP_HOME'] = r'C:\Users\sainadh\my_venv\Hadoop\bin'

### Spark SQL
writing sql queries for the pyspark

In [None]:
# first we need to create temporary view
df.createTempView('my_view')

In [None]:
%sql

select * from my_view where Item_Fat_Content = 'Lf'

In [45]:
# command to run sql commands in the spark
df_sql = spark.sql("select * from my_view where Item_Fat_Content = 'Low Fat'")
df_sql.show(n=1,vertical=True)

-RECORD 0--------------------------------------
 Item_Identifier           | FDA15             
 Item_Weight               | 9.3               
 Item_Fat_Content          | Low Fat           
 Item_Visibility           | 0.016047301       
 Item_MRP                  | Dairy             
 Outlet_Identifier         | 249.8092          
 Outlet_Establishment_Year | OUT049            
 Outlet_Size               | 1999              
 Outlet_Location_Type      | Medium            
 Outlet_Type               | Tier 1            
 Item_Outlet_Sales         | Supermarket Type1 
only showing top 1 row
