<a href="https://colab.research.google.com/github/leonidke/Excel-Tutorial/blob/main/%5BSample_Notebook%5D_AfterWork_Data_Analysis_with_Spark_SQL.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

# [Sample Notebook] AfterWork: Data Analysis with Spark SQL

# Pre-requisites

In [1]:
# Import PySpark
!pip install pyspark

Collecting pyspark
  Downloading pyspark-3.5.1.tar.gz (317.0 MB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m317.0/317.0 MB[0m [31m3.8 MB/s[0m eta [36m0:00:00[0m
[?25h  Preparing metadata (setup.py) ... [?25l[?25hdone
Building wheels for collected packages: pyspark
  Building wheel for pyspark (setup.py) ... [?25l[?25hdone
  Created wheel for pyspark: filename=pyspark-3.5.1-py2.py3-none-any.whl size=317488491 sha256=1a58dcadfd4c179626b95af06ccd8c56c3c2d3066ad2a603496aff1139d9ae28
  Stored in directory: /root/.cache/pip/wheels/80/1d/60/2c256ed38dddce2fdd93be545214a63e02fbd8d74fb0b7f3a6
Successfully built pyspark
Installing collected packages: pyspark
Successfully installed pyspark-3.5.1


In [2]:
# Data Importation
from pyspark.sql import SparkSession

You can download all the datasets for this notebook [here](https://drive.google.com/file/d/1df4AAwSgWOrUH7zXSmO2RdDldJgW3uxf/view?usp=sharing).

# 1. Creating DataFrames


We create DataFrames to represent structured data in a tabular format, allowing us to perform data analysis efficiently. For example, we can use DataFrames to analyze sales data, track customer information, or process log files.



In [9]:
# Create a Spark session with an application name
spark = SparkSession.builder.appName('DataFrameExample').getOrCreate()

# Load data from the CSV file into a DataFrame: https://afterwork.ai/ds/e/information_technology_pafch.csv
df = spark.read.csv('information_technology_pafch.csv', header=True, inferSchema=True)

# Register the DataFrame as a temporary SQL view
df.createOrReplaceTempView('servers')

# Display the data
df.show(5)

# Stop the Spark session

+-----------+------------+--------------------+-----------+------------+---------+------------------------+-------------+-----------+--------+
|Server Name|  IP Address|    Operating System|Memory (GB)|Storage (TB)|CPU Cores|Network Bandwidth (Gbps)|     Location|Environment|  Status|
+-----------+------------+--------------------+-----------+------------+---------+------------------------+-------------+-----------+--------+
|    Server1|192.168.1.10| Windows Server 2016|         32|           4|        8|                     1.0|     New York| Production|  Active|
|    Server2|192.168.1.11|        Ubuntu 20.04|         16|           2|        4|                     0.5|San Francisco|Development|  Active|
|    Server3|192.168.1.12|            CentOS 7|         64|           8|       16|                     2.0|      Chicago| Production|Inactive|
|    Server4|192.168.1.13|Red Hat Enterpris...|        128|          16|       32|                     4.0|       Dallas|    Staging|  Active|

## Challenge

Create a DataFrame using the dataset from the following URL: https://afterwork.ai/ds/ch/operations_y14ap.csv.  

In [7]:
# Create a Spark session with an application name
spark = SparkSession.builder.appName('DataFrameExample').getOrCreate()

# Load data from the CSV file into a DataFrame
df = spark.read.csv('operations_y14ap.csv', header=True, inferSchema=True)

# Register the DataFrame as a temporary SQL view
df.createOrReplaceTempView('servers')

# Display the data
df.show(5)

# Stop the Spark session
spark.stop()

+-----------+------------+--------------------+-----------+------------+---------+------------------------+-----------+-----------+--------+
|Server Name|  IP Address|    Operating System|Memory (GB)|Storage (TB)|CPU Cores|Network Bandwidth (Gbps)|   Location|Environment|  Status|
+-----------+------------+--------------------+-----------+------------+---------+------------------------+-----------+-----------+--------+
|    Server1|192.168.1.10| Windows Server 2016|         32|           4|        8|                     1.0|   New York| Production|  Active|
|    Server2|192.168.1.11|        Ubuntu 20.04|         16|           2|        4|                     0.5|Los Angeles|Development|  Active|
|    Server3|192.168.1.12|            CentOS 7|         64|           8|       16|                     2.0|    Chicago| Production|Inactive|
|    Server4|192.168.1.13|Red Hat Enterpris...|        128|          16|       32|                     4.0|      Miami|    Testing|  Active|
|    Server5|

# 2. Data Type Conversion

We perform data type conversion to convert the data types of columns in a DataFrame. For example, we may need to convert a string column to a numeric type for mathematical operations. A real-life use case for data type conversion is when working with financial data where we need to convert currency values from strings to numeric types for calculations. To apply data type conversion in SQLContext, we use functions like `cast()` to explicitly convert the data types of columns in a DataFrame.



In [10]:
# Create a Spark session with an application name
spark = SparkSession.builder.appName('Data_Type_Conversion').getOrCreate()

# Load data from the CSV file into a DataFrame: https://afterwork.ai/ds/e/servers_ame42.csv
df = spark.read.csv('servers_ame42.csv', header=True, inferSchema=True)

# Display the data
df.show(5)

+-----------+------------+--------------------+-----------+------------+---------+------------------------+-----------+-----------+--------+
|Server Name|  IP Address|    Operating System|Memory (GB)|Storage (TB)|CPU Cores|Network Bandwidth (Gbps)|   Location|Environment|  Status|
+-----------+------------+--------------------+-----------+------------+---------+------------------------+-----------+-----------+--------+
|    Server1|192.168.1.10| Windows Server 2016|         32|           4|        8|                     1.0|   New York| Production|  Active|
|    Server2|192.168.1.11|        Ubuntu 20.04|         16|           2|        4|                     0.5|Los Angeles|Development|Inactive|
|    Server3|192.168.1.12|            CentOS 7|         64|           8|       16|                     2.0|    Chicago| Production|  Active|
|    Server4|192.168.1.13|Red Hat Enterpris...|        128|          16|       32|                     4.0|      Miami|    Staging|  Active|
|    Server5|

In [11]:
# Register the DataFrame as a temporary SQL view
df.createOrReplaceTempView('servers_table')

# Perform the data type conversion in SQLContext
converted_df = spark.sql("SELECT *, CAST(`Memory (GB)` AS INT) AS Memory_Int, CAST(`Storage (TB)` AS FLOAT) AS Storage_Float FROM servers_table")

# Display the filtered data
converted_df.show()

# Stop the Spark session
spark.stop()

+-----------+------------+--------------------+-----------+------------+---------+------------------------+-------------+-----------+--------+----------+-------------+
|Server Name|  IP Address|    Operating System|Memory (GB)|Storage (TB)|CPU Cores|Network Bandwidth (Gbps)|     Location|Environment|  Status|Memory_Int|Storage_Float|
+-----------+------------+--------------------+-----------+------------+---------+------------------------+-------------+-----------+--------+----------+-------------+
|    Server1|192.168.1.10| Windows Server 2016|         32|           4|        8|                     1.0|     New York| Production|  Active|        32|          4.0|
|    Server2|192.168.1.11|        Ubuntu 20.04|         16|           2|        4|                     0.5|  Los Angeles|Development|Inactive|        16|          2.0|
|    Server3|192.168.1.12|            CentOS 7|         64|           8|       16|                     2.0|      Chicago| Production|  Active|        64|       

## Challenge

Given the dataset from the URL: https://afterwork.ai/ds/ch/servers_dig70.csv, write code to convert the 'Memory (GB)' column from string to integer data type in a DataFrame.


In [12]:
# Create a Spark session with an application name
spark = SparkSession.builder.appName('Data_Type_Conversion').getOrCreate()

# Load data from the CSV file into a DataFrame: https://afterwork.ai/ds/ch/servers_dig70.csv
df = spark.read.csv('servers_dig70.csv', header=True, inferSchema=True)

# Display the data
df.show(5)

+-----------+------------+--------------------+-----------+------------+---------+------------------------+-------------+-----------+--------+
|Server Name|  IP Address|    Operating System|Memory (GB)|Storage (TB)|CPU Cores|Network Bandwidth (Gbps)|     Location|Environment|  Status|
+-----------+------------+--------------------+-----------+------------+---------+------------------------+-------------+-----------+--------+
|    Server1|192.168.1.10| Windows Server 2016|         32|           4|        8|                     1.0|     New York| Production|  Active|
|    Server2|192.168.1.11|        Ubuntu 20.04|         16|           2|        4|                     0.5|San Francisco|Development|  Active|
|    Server3|192.168.1.12|            CentOS 7|         64|           8|       16|                     2.0|      Chicago| Production|Inactive|
|    Server4|192.168.1.13|Red Hat Enterpris...|        128|          16|       32|                     4.0|        Miami|    Staging|  Active|

In [14]:
# Register the DataFrame as a temporary SQL view
df.createOrReplaceTempView('servers_table')

# Perform the data type conversion in SQLContext
convert_df = spark.sql("SELECT *, CAST('MEMORY (GB)' AS INT) AS Memory_Int, CAST('Storage (TB)' AS FLOAT) AS Storage_flt FROM servers_table")

# Display the filtered data
convert_df.show()

# Stop the Spark session
# Write your code here


+-----------+------------+--------------------+-----------+------------+---------+------------------------+-------------+-----------+--------+----------+-----------+
|Server Name|  IP Address|    Operating System|Memory (GB)|Storage (TB)|CPU Cores|Network Bandwidth (Gbps)|     Location|Environment|  Status|Memory_Int|Storage_flt|
+-----------+------------+--------------------+-----------+------------+---------+------------------------+-------------+-----------+--------+----------+-----------+
|    Server1|192.168.1.10| Windows Server 2016|         32|           4|        8|                     1.0|     New York| Production|  Active|      NULL|       NULL|
|    Server2|192.168.1.11|        Ubuntu 20.04|         16|           2|        4|                     0.5|San Francisco|Development|  Active|      NULL|       NULL|
|    Server3|192.168.1.12|            CentOS 7|         64|           8|       16|                     2.0|      Chicago| Production|Inactive|      NULL|       NULL|
|   

# 3. Handling Duplicates

We can handle duplicates by identifying and removing rows that have identical values across all columns. In real-life scenarios, duplicates can occur due to data entry errors, system issues, or merging multiple datasets. By handling duplicates, we prevent skewed results and maintain the quality of our analysis. To apply this concept, we first identify duplicate rows using the 'dropDuplicates' function.



In [15]:
# Create a Spark session with an application name
spark = SparkSession.builder.appName('HandlingDuplicates').getOrCreate()

# Load data from the CSV file into a DataFrame: https://afterwork.ai/ds/e/sales_wmtgv.csv
df = spark.read.csv('sales_wmtgv.csv', header=True, inferSchema=True)

# Display the data
df.show(5)

+-----------+------------+--------------------+-----------+------------+---------+------------------------+-----------+-----------+--------+
|Server Name|  IP Address|    Operating System|Memory (GB)|Storage (TB)|CPU Cores|Network Bandwidth (Gbps)|   Location|Environment|  Status|
+-----------+------------+--------------------+-----------+------------+---------+------------------------+-----------+-----------+--------+
|    Server1|192.168.1.10| Windows Server 2016|         32|           4|        8|                     1.0|   New York| Production|  Active|
|    Server2|192.168.1.11|        Ubuntu 20.04|         16|           2|        4|                     0.5|Los Angeles|Development|Inactive|
|    Server3|192.168.1.12|            CentOS 7|         64|           8|       16|                     2.0|    Chicago| Production|  Active|
|    Server4|192.168.1.13|Red Hat Enterpris...|        128|          16|       32|                     4.0|      Miami|    Staging|Inactive|
|    Server5|

In [16]:
# Register the DataFrame as a temporary SQL view
df.createOrReplaceTempView('sales_data')

# Perform the data analysis technique to handle duplicates
filtered_df = spark.sql('SELECT * FROM sales_data').dropDuplicates()

# Display the filtered data
filtered_df.show()

# Stop the Spark session
spark.stop()

+-----------+------------+--------------------+-----------+------------+---------+------------------------+-------------+-----------+--------+
|Server Name|  IP Address|    Operating System|Memory (GB)|Storage (TB)|CPU Cores|Network Bandwidth (Gbps)|     Location|Environment|  Status|
+-----------+------------+--------------------+-----------+------------+---------+------------------------+-------------+-----------+--------+
|    Server4|192.168.1.13|Red Hat Enterpris...|        128|          16|       32|                     4.0|        Miami|    Staging|Inactive|
|   Server72|192.168.1.81|Red Hat Enterpris...|         16|           2|        4|                     0.5|   Long Beach|    Staging|Inactive|
|   Server46|192.168.1.55|            Debian 4|         16|           2|        4|                     0.5|      Phoenix|Development|Inactive|
|   Server26|192.168.1.35|Red Hat Enterpris...|         32|           4|        8|                     1.0|Oklahoma City|Development|Inactive|

## Challenge

Given the dataset from the URL: https://afterwork.ai/ds/ch/sales_7fdtc.csv, write code to handle duplicates by removing rows that have identical values across all columns. Use the 'dropDuplicates' function to achieve this.  


In [None]:
# Import the SparkSession class from the pyspark.sql module
from pyspark.sql import SparkSession

# Create a Spark session with an application name
spark = SparkSession.builder.appName('HandlingDuplicates').getOrCreate()

# Load data from the CSV file into a DataFrame: https://afterwork.ai/ds/ch/sales_7fdtc.csv
df = spark.read.csv('sales_7fdtc.csv', header=True, inferSchema=True)

# Display the data
df.show(5)

In [None]:
# Register the DataFrame as a temporary SQL view
# Write your code here

# Perform the data analysis technique to handle duplicates
# Write your code here

# Display the filtered data
# Write your code here

# Stop the Spark session
# Write your code here

# 4. Handling Missing Values

We handle missing values by identifying and dealing with any null or NaN values present in the dataset. Missing values can occur due to various reasons such as data entry errors, sensor malfunctions, or incomplete data collection. It is crucial to address missing values as they can impact the accuracy and reliability of our analysis. For example, in a dataset of customer feedback, missing values in the 'rating' column can skew our analysis of customer satisfaction.



In [None]:
# Create a Spark session with an application name
spark = SparkSession.builder.appName('HandleMissingValues').getOrCreate()

# Load data from the CSV file into a DataFrame: https://afterwork.ai/ds/e/operations_fx6ye.csv
df = spark.read.csv('operations_fx6ye.csv', header=True, inferSchema=True)

# Display the data
df.show(5)

In [None]:
# Register the DataFrame as a temporary SQL view
df.createOrReplaceTempView('servers')

# Perform the data analysis technique - Handling Missing Values in SQLContext DataFrames
df_cleaned = spark.sql('SELECT * FROM servers WHERE `CPU Cores` IS NULL AND `Storage (TB)` IS NOT NULL')

# Display the filtered data
df_cleaned.show()

# Stop the Spark session
spark.stop()

# 5. Filtering Data

We filter data to extract only the rows that meet specific conditions. For example, we may want to filter a DataFrame to only include records where the sales amount is greater than $1000. A real-life use case for filtering data is in e-commerce analytics, where we can filter customer data to identify high-spending customers based on their purchase history.

In [None]:
# Create a Spark session with an application name
spark = SparkSession.builder.appName('SalesAnalysis').getOrCreate()

# Load data from the CSV file into a DataFrame: https://afterwork.ai/ds/e/sales_iaz2h.csv
df = spark.read.csv('sales_iaz2h.csv', header=True, inferSchema=True)

# Display the data
df.show(5)

In [None]:
# Register the DataFrame as a temporary SQL view
df.createOrReplaceTempView('sales')

# Perform the data analysis technique - Filtering data to extract only rows where Memory is greater than 64 GB
filtered_data = spark.sql('SELECT * FROM sales WHERE `Memory (GB)` > 64')

# Display the filtered data
filtered_data.show()

# Stop the Spark session
spark.stop()

### Challenge

Given the dataset from the URL: https://afterwork.ai/ds/ch/sales_9uorv.csv, write code to filter the DataFrame to only include records where the total price is greater than $1000.


In [None]:
# Create a Spark session with an application name
spark = SparkSession.builder.appName('SalesAnalysis').getOrCreate()

# Load data from the CSV file into a DataFrame: https://afterwork.ai/ds/ch/sales_9uorv.csv
df = spark.read.csv('sales_9uorv.csv', header=True, inferSchema=True)

# Display the data
df.show(5)

In [None]:
# Register the DataFrame as a temporary SQL view
# Write your code here

# Perform the data analysis technique - Filtering data to extract only rows where total price is greater than $1000
# Write your code here

# Display the filtered data
# Write your code here

# Stop the Spark session
spark.stop()

# 6. Sorting Data

We sort data to arrange it in a specific order based on one or more columns. This helps us organize the data in a meaningful way for analysis and presentation. For example, in a sales dataset, we can sort the data based on the sales amount in descending order to identify the top-selling products.



In [None]:
# Create a Spark session with an application name
spark = SparkSession.builder.appName('SortingDataExample').getOrCreate()

# Load data from the CSV file into a DataFrame: https://afterwork.ai/ds/e/operations_mtfg0.csv
df = spark.read.csv('operations_mtfg0.csv', header=True, inferSchema=True)

# Display the data
df.show(5)

In [None]:
# Register the DataFrame as a temporary SQL view
df.createOrReplaceTempView('operations_data')

# Perform the data analysis technique - Sorting data by 'Memory (GB)' column in descending order
sorted_data = spark.sql('SELECT * FROM operations_data ORDER BY `Memory (GB)` DESC')

# Display the filtered data
sorted_data.show()

# Stop the Spark session
spark.stop()

# 7. Aggregating Data

Aggregating data involves combining and summarizing multiple rows of data into a single result. For example, we can aggregate sales data to calculate the total revenue generated by each product category. A real-life use case of aggregating data is in financial analysis, where we can aggregate transaction data to calculate total expenses or revenue for a specific time period. To apply aggregation, we use functions like SUM, AVG, COUNT, and MAX along with the GROUP BY clause to group the data based on a specific column.



In [None]:
 # Create a Spark session with an application name
spark = SparkSession.builder.appName('SalesAnalysis').getOrCreate()

# Load data from the CSV file into a DataFrame: https://afterwork.ai/ds/e/sales_ew6oq.csv
df = spark.read.csv('sales_ew6oq.csv', header=True, inferSchema=True)

# Display the data
df.show(5)

In [None]:
# Register the DataFrame as a temporary SQL view
df.createOrReplaceTempView('sales')

# Perform the data analysis technique - Aggregating data to calculate total revenue generated by each product category
result = spark.sql('SELECT Location, SUM(`Storage (TB)`) AS Total_Storage FROM sales GROUP BY Location')

# Display the filtered data
result.show()

# Stop the Spark session
spark.stop()

### Challenge

Given the dataset from the URL: https://afterwork.ai/ds/ch/sales_6on5e.csv, write a SQL query to find the average price generated for each product category. Use aggregation functions like AVG and GROUP BY clause to achieve this.

In [None]:
 # Create a Spark session with an application name
spark = SparkSession.builder.appName('SalesAnalysis').getOrCreate()

# Load data from the CSV file into a DataFrame: https://afterwork.ai/ds/ch/sales_6on5e.csv
df = spark.read.csv('sales_6on5e.csv', header=True, inferSchema=True)

# Display the data
df.show(5)

In [None]:
# Register the DataFrame as a temporary SQL view
# Write your code here

# Perform the data analysis technique - Aggregating data to calculate total price generated by each product category
# Write your code here

# Display the filtered data
# Write your code here

# Stop the Spark session
spark.stop()

# 8. Grouping Data by Multiple Columns

We group data by multiple columns when we want to analyze data based on multiple criteria. For example, in a sales dataset, we may want to group sales data by both region and product category to understand which regions perform best for each category. This allows us to perform aggregate functions on the grouped data and generate meaningful summaries for our analysis.



In [None]:
# Create a Spark session with an application name
spark = SparkSession.builder.appName('SalesAnalysis').getOrCreate()

# Load data from the CSV file into a DataFrame: https://afterwork.ai/ds/e/sales_pvaqb.csv
df = spark.read.csv('sales_pvaqb.csv', header=True, inferSchema=True)

# Display the data
df.show(5)

In [None]:
# Register the DataFrame as a temporary SQL view
df.createOrReplaceTempView('sales_data')

# Perform the data analysis technique by grouping data by multiple columns
grouped_data = spark.sql('SELECT Location, Environment, COUNT(*) AS TotalServers FROM sales_data GROUP BY Location, Environment')

# Display the filtered data
grouped_data.show()

# Stop the Spark session
spark.stop()