# **Chapter 9: Distributed Computing**

This is Chapter / Sprint 9 that focuses on local development using PySpark to perform data exploration, cleaning, transformation, and analysis.

## Initialization

This section handles PySpark initialization, module import, and data ingestion.

### Install PySpark

Install necessary packages.

In [None]:
!pip install pyspark
!pip install -U -q PyDrive
!apt install openjdk-8-jdk-headless -qq

Set the environment variable for Java since Spark requires a JDK to run.

In [2]:
import os
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-8-openjdk-amd64"

Import the core PySpark modules and functions that will be used.

In [3]:
from pyspark.context import SparkContext
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, expr, to_date, when, regexp_replace
from pyspark.sql.types import IntegerType, DateType

### Initialize PySpark Session

Initialize a PySpark Session with name *DrivenData Distributed Computing*.

In [4]:
sc = SparkContext.getOrCreate()
spark = SparkSession.builder.appName(
	'DrivenData Distributed Computing').getOrCreate()

### Import Data

Import the CSV file containing synthetic data. Here, the header option is used to specify column names.

In [5]:
df_14 = spark.read.csv('batch_2024-09-14.csv', header=True, inferSchema=True)
df_15 = spark.read.csv('batch_2024-09-15.csv', header=True, inferSchema=True)

### Data Join

Join the dataset from 14th of October with the dataset from 15th of October on *unique_id*.

In [6]:
df = df_14.union(df_15)

## Data Processing

This section provide functionalities for data processing.

### Data Exploration

Explore the dataset by displaying the schema, the first few rows, and general statistics.

In [7]:
# Show the schema of the dataset.
df.printSchema()

# Display the first 5 rows.
df.show(5)

# Display dataset summary.
df.describe().show()

root
 |-- person_name: string (nullable = true)
 |-- user_name: string (nullable = true)
 |-- email: string (nullable = true)
 |-- personal_number: long (nullable = true)
 |-- birth_date: date (nullable = true)
 |-- address: string (nullable = true)
 |-- phone: string (nullable = true)
 |-- mac_address: string (nullable = true)
 |-- ip_address: string (nullable = true)
 |-- iban: string (nullable = true)
 |-- accessed_at: timestamp (nullable = true)
 |-- session_duration: integer (nullable = true)
 |-- download_speed: integer (nullable = true)
 |-- upload_speed: integer (nullable = true)
 |-- consumed_traffic: integer (nullable = true)
 |-- unique_id: string (nullable = true)

+----------------+---------------+--------------------+---------------+----------+--------------------+------------+-----------------+--------------+--------------------+-------------------+----------------+--------------+------------+----------------+--------------------+
|     person_name|      user_name|      

### Data Cleaning

Handle missing values by replace with default values for *email* and *phone* columns.

In [8]:
df = df.na.fill({"email": "unknown@example.com", "phone": "000-000-0000"})

Remove duplicates by using the *unique_id* column, which should be a unique identifier.

In [9]:
df = df.dropDuplicates(subset=["unique_id"])

Ensure that the *birth_date* and *session_duration* columns have the correct data types (DateType and IntegerType, respectively).

In [10]:
df = df.withColumn("birth_date", col("birth_date").cast(DateType()))
df = df.withColumn(
    "session_duration", col("session_duration").cast(IntegerType()))

### Data Filtering

Filter the dataset based on specific conditions, such as filtering for specific access dates and high data usage.

In [11]:
# Filter by access date (accessed after a specific date).
df_filtered = df.filter(to_date(df.accessed_at) > '2024-10-13')

# Filter by consumed traffic (greater than 1000).
df_filtered = df.filter(df.consumed_traffic > 1000)

### Grouping and Aggregation

Group data by *person_name* and perform aggregation, calculating the average session duration and total consumed traffic.

In [12]:
df_grouped = df.groupBy("person_name").agg(
    {"session_duration": "avg", "consumed_traffic": "sum"})
df_grouped.show()

+-------------------+---------------------+---------------------+
|        person_name|sum(consumed_traffic)|avg(session_duration)|
+-------------------+---------------------+---------------------+
|         Jana Marin|              6905197|   20729.571428571428|
|       Flavia Marin|              3274078|              16704.0|
|      Patricia Stan|               843562|              25485.0|
|      Ludovica Toma|              1245476|              20400.0|
|   Olimpian Cristea|              1531586|               4091.0|
|     Betina Cristea|              2955150|   22036.333333333332|
|Ruxandra Dumitrescu|              6463578|    21640.14285714286|
|       Răzvan Suciu|              3203808|             15670.25|
|   Ionuț Diaconescu|              5887630|             13111.75|
|        Emil Nistor|              1924661|   19331.333333333332|
|         Dana Marin|              5756578|              17521.5|
|       Adelina Toma|              1637256|              33386.0|
|       Si

### Data Transformation

Create a new column *total_bandwidth* as the sum of download_speed and upload_speed, and another column for the *birth_year*.

In [13]:
# Calculate total bandwidth.
df = df.withColumn(
    "total_bandwidth", col("download_speed") + col("upload_speed"))

# Extract birth year.
df = df.withColumn("birth_year", expr("year(birth_date)"))

### Data Segmentation

Categorize users based on their session activity levels (active, moderate, less active) using *session_duration*.

In [14]:
df = df.withColumn("activity_level",
                   when(col("session_duration") > 120, "active")
                   .when(col("session_duration").between(30, 120), "moderate")
                   .otherwise("less_active"))

### Data Anonymization

To protect personal data, mask *email* addresses by partially obscuring them.

In [15]:
df = df.withColumn("masked_email",
                   regexp_replace("email", "(\\w{3})\\w+@(\\w+)", "$1***@$2"))

## Data Analysis

This section provide functionalities for data analysis.

### Session Analysis

Analyze session data, finding the average session duration and the longest session.

In [16]:
# Average session duration.
df.agg({"session_duration": "avg"}).show()

# Maximum session duration.
df.agg({"session_duration": "max"}).show()

+---------------------+
|avg(session_duration)|
+---------------------+
|   18023.480544147333|
+---------------------+

+---------------------+
|max(session_duration)|
+---------------------+
|                36000|
+---------------------+



### Network Analysis

Analyze data traffic usage based on *ip_address*, identifying devices with the highest data consumption.

In [17]:
df_ip_activity = df.groupBy("ip_address").agg(
    {"consumed_traffic": "sum"}).orderBy(
        "sum(consumed_traffic)", ascending=False)
df_ip_activity.show()

+---------------+---------------------+
|     ip_address|sum(consumed_traffic)|
+---------------+---------------------+
|   172.248.99.1|              1999989|
| 109.95.154.240|              1999961|
|  30.177.153.54|              1999954|
|  205.19.105.29|              1999934|
|  135.140.77.39|              1999910|
|   12.80.170.77|              1999902|
|   212.4.71.161|              1999881|
|115.193.211.245|              1999848|
| 147.103.125.27|              1999844|
|   42.27.69.130|              1999842|
|  161.49.80.180|              1999834|
| 178.33.213.243|              1999829|
| 11.117.248.174|              1999826|
|    4.215.6.241|              1999819|
|134.138.144.132|              1999812|
| 58.145.216.149|              1999797|
|   50.167.23.13|              1999781|
| 67.117.160.251|              1999772|
|   84.192.46.64|              1999772|
| 80.144.178.236|              1999761|
+---------------+---------------------+
only showing top 20 rows



### Time Series Analysis

Analyze user activity over time by extracting date information from *accessed_at* and grouping the data by day.

In [18]:
df_time = df.withColumn("access_date",
                        to_date("accessed_at")).groupBy("access_date").count()
df_time.show()

+-----------+-----+
|access_date|count|
+-----------+-----+
| 2024-06-12|  299|
| 2024-06-04|  276|
| 2024-02-05|  324|
| 2023-11-08|  260|
| 2024-08-27|  298|
| 2024-05-30|  277|
| 2023-11-22|  284|
| 2024-05-25|  229|
| 2023-09-19|  289|
| 2024-01-07|  285|
| 2023-12-10|  287|
| 2023-11-29|  246|
| 2024-08-30|  284|
| 2024-04-20|  295|
| 2024-07-08|  287|
| 2024-01-11|  263|
| 2023-09-27|  293|
| 2024-09-10|  285|
| 2023-11-25|  305|
| 2024-08-05|  310|
+-----------+-----+
only showing top 20 rows



## Export Data

Export the processed DataFrame back to a CSV file.

In [19]:
df.write.csv("processed_data_2024-10-15.csv", header=True)