# Spark Notebook

@roman, pablo, javier

19 May, 2024

---
# Settings

In [1]:
from pyspark.sql import SparkSession
import pyspark.sql.functions as F
from pyspark.sql.functions import col
from pyspark.sql.window import Window

Starting Spark application


ID,YARN Application ID,Kind,State,Spark UI,Driver log,User,Current session?
1,application_1716122928296_0002,pyspark,idle,Link,Link,,✔


FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

SparkSession available as 'spark'.


FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [2]:
# init spark session
spark = SparkSession.builder.appName('telecom').getOrCreate()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [3]:
# bucket
NAME = 'javier'
BUCKET = f"s3://itam-analytics-{NAME}"
FOLDER = 'telecom'

SAVE_BUCKET = 'telecom-outputs'

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

---
# Data

## S1: Read Data

In [4]:
# read parquet from s3
df_telecom = spark.read.parquet(f"{BUCKET}/{FOLDER}")

# look columns
df_telecom.printSchema()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

root
 |-- report_id: long (nullable = true)
 |-- trigger_name: string (nullable = true)
 |-- result_date: timestamp_ntz (nullable = true)
 |-- received_date: timestamp_ntz (nullable = true)
 |-- received_date_local: string (nullable = true)
 |-- time_zone_name: string (nullable = true)
 |-- device_id: double (nullable = true)
 |-- device_model: string (nullable = true)
 |-- device_manufacturer: string (nullable = true)
 |-- device_model_raw: string (nullable = true)
 |-- device_manufacturer_raw: string (nullable = true)
 |-- device_brand_raw: string (nullable = true)
 |-- os_version: string (nullable = true)
 |-- app_version: string (nullable = true)
 |-- connection_type: double (nullable = true)
 |-- is_airplane_mode: string (nullable = true)
 |-- is_network_roaming: string (nullable = true)
 |-- is_international_roaming: string (nullable = true)
 |-- number_registered_networks: double (nullable = true)
 |-- number_unregistered_networks: double (nullable = true)
 |-- sim_operator_name

In [5]:
# see number of rows
df_telecom.count()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

22030647

In [6]:
# see number of nulls in column "subregion"
df_telecom.filter(col("locality").isNull()).count()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

0

---
# Users Demographics

## S1: Where does each user lives

In [7]:
# get the most visited postal_code per user as dataframe
df_user_location = (
    df_telecom
    .groupBy("device_id", "raw_sim_operator_name", "postal_code").count()
    .withColumn("rank", F.row_number().over(Window.partitionBy("device_id").orderBy(col("count").desc())))
    .filter(col("rank") == 1)
    .drop("rank", "count")
    )

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [8]:
# for each user get distinct rows of device_id, postal_code, raw_sim_operator_name
df_lon_lat = (
    df_telecom
    .select("device_id", "postal_code", "raw_sim_operator_name", "client_longitude", "client_latitude")
    .dropDuplicates(
        subset=["device_id", "postal_code", "raw_sim_operator_name"]
    )
)

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [None]:
# join both dataframes
df_user_location = (
    df_user_location
    .join(df_lon_lat, ["device_id", "postal_code", "raw_sim_operator_name"], "inner")
)

# show
df_user_location.show()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

## S2: Where does each user connects

In [13]:
# get the date of each connection per postal_code as dataframe
df_user_connection = (
    df_telecom
    .groupBy("device_id", "raw_sim_operator_name", "postal_code","result_date").count()
    .withColumn("rank", F.row_number().over(Window.partitionBy("device_id").orderBy(col("count").desc())))
    .drop("rank", "count")
    )

# show
df_user_connection.show()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+------------+---------------------+-----------+-------------------+
|   device_id|raw_sim_operator_name|postal_code|        result_date|
+------------+---------------------+-----------+-------------------+
|4.38055655E8|                 BAIT|      72340|2024-04-01 22:40:46|
|4.64511631E8|                 BAIT|      53348|2024-04-01 03:13:41|
|4.52248118E8|                 BAIT|      86050|2024-04-01 17:57:48|
|4.56368721E8|                 BAIT|      52997|2024-04-02 06:26:29|
|4.46934763E8|                 BAIT|      54466|2024-04-01 13:53:29|
|4.66838394E8|              DALEFON|      20358|2024-04-01 04:52:42|
|4.39397068E8|                 BAIT|      20259|2024-04-01 22:11:08|
|4.60056803E8|              DALEFON|      72595|2024-04-02 03:06:28|
| 4.4498981E8|                 BAIT|      61312|2024-04-22 02:51:52|
|4.62917852E8|                PILLO|      72020|2024-04-22 01:03:08|
|3.93071691E8|               MEXFON|      76121|2024-04-22 06:55:02|
| 4.5838915E8|                 BAI

---
# Write

In [None]:
# save table to parquet
df_user_location.write.parquet(f"{BUCKET}/{SAVE_BUCKET}/user_location", mode="overwrite")

In [None]:
# save table to parquet
df_user_connection.write.parquet(f"{BUCKET}/{SAVE_BUCKET}/user_connection", mode="overwrite")

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

---
# Sandbox

In [None]:
# count number of unique localities
df_location_counts.select("locality").distinct().count()