# Import libraries and set environment variables



In the first cell, we will set up the imports required to get AWS keys using the requests library and mount the S3 data into Databricks. Make sure that the environment supports the correct libraries for AWS interaction.


In [0]:
import requests

# URL to get the AWS keys
aws_key_url = 'https://wcd-de-labs-files.s3.amazonaws.com/key.json'

# Fetch the AWS keys
response = requests.get(aws_key_url)
aws_keys = response.json()

# Extract the access key and secret access key from the JSON structure
ACCESS_KEY = aws_keys['key'][0]['access_key']
SECRET_KEY = aws_keys['key'][0]['secret_access_key']

# Set the AWS credentials in Databricks environment
spark.conf.set("fs.s3a.access.key", ACCESS_KEY)
spark.conf.set("fs.s3a.secret.key", SECRET_KEY)

# Output the keys to verify (make sure not to share these keys)
print("AWS keys have been set.")


AWS keys have been set.


# Mount the S3 folder to Databricks


Next, we will mount the S3 bucket, specifically the folder cdr_by_grid_december, onto the Databricks /mnt/cdr folder.

In [0]:
# Function to check if a mount point exists
def mount_point_exists(mount_point):
    try:
        dbutils.fs.ls(mount_point)
        return True
    except:
        return False

# Unmount if necessary (Databricks only allows one mount per folder)
if mount_point_exists("/mnt/cdr_ds"):
    dbutils.fs.unmount("/mnt/cdr_ds")

# Now mount the December subfolder to /mnt/cdr
dbutils.fs.mount(
  source = "s3a://weclouddata/datasets/telecom/CDR/cdr_by_grid_december",
  mount_point = "/mnt/cdr_ds",
  extra_configs = {"fs.s3a.access.key": ACCESS_KEY, "fs.s3a.secret.key": SECRET_KEY}
)

# Verify the mount by listing the files in the mounted directory
display(dbutils.fs.ls("/mnt/cdr_ds"))

/mnt/cdr_ds has been unmounted.


path,name,size,modificationTime
dbfs:/mnt/cdr_ds/sms-call-internet-mi-2013-12-01.txt,sms-call-internet-mi-2013-12-01.txt,298715145,1534382855000
dbfs:/mnt/cdr_ds/sms-call-internet-mi-2013-12-02.txt,sms-call-internet-mi-2013-12-02.txt,341919663,1534382855000
dbfs:/mnt/cdr_ds/sms-call-internet-mi-2013-12-03.txt,sms-call-internet-mi-2013-12-03.txt,353947238,1534382855000
dbfs:/mnt/cdr_ds/sms-call-internet-mi-2013-12-04.txt,sms-call-internet-mi-2013-12-04.txt,352032545,1534382855000
dbfs:/mnt/cdr_ds/sms-call-internet-mi-2013-12-05.txt,sms-call-internet-mi-2013-12-05.txt,353519447,1534382855000
dbfs:/mnt/cdr_ds/sms-call-internet-mi-2013-12-06.txt,sms-call-internet-mi-2013-12-06.txt,354028475,1534383133000
dbfs:/mnt/cdr_ds/sms-call-internet-mi-2013-12-07.txt,sms-call-internet-mi-2013-12-07.txt,307172220,1534383160000
dbfs:/mnt/cdr_ds/sms-call-internet-mi-2013-12-08.txt,sms-call-internet-mi-2013-12-08.txt,293663723,1534383165000
dbfs:/mnt/cdr_ds/sms-call-internet-mi-2013-12-09.txt,sms-call-internet-mi-2013-12-09.txt,338963272,1534383165000
dbfs:/mnt/cdr_ds/sms-call-internet-mi-2013-12-10.txt,sms-call-internet-mi-2013-12-10.txt,349269739,1534383165000


# Load the data for the first five days of December

Next, we will read the first five days from the folder and load them into a DataFrame.

In [0]:
# Initialize an empty list to store file paths
cdrFiles = []

# Loop through the range to create file paths for the first five days
for i in range(1, 6):
    file = f'/mnt/cdr_ds/sms-call-internet-mi-2013-12-{i:02}.txt'
    cdrFiles.append(file)

# Read the first five days into a DataFrame
df = spark.read.text(cdrFiles)

# Show the first few rows without truncation
df.show(5, truncate=False)


+----------------------------------------------------------------------------------------------------------------------------+
|value                                                                                                                       |
+----------------------------------------------------------------------------------------------------------------------------+
|1\t1385852400000\t39\t0.11098916961424417\t0.16621436886121638\t0.10920185950874473\t0.16442705875571695\t13.648437920592805|
|1\t1385852400000\t46\t\t\t\t\t0.026137424264286602                                                                          |
|1\t1385853000000\t39\t0.16513682662061693\t0.1763994583739133\t0.030875085088185057\t0.02730046487718618\t13.330858194494864|
|1\t1385853600000\t0\t0.029087774982685617\t0.02730046487718618\t\t\t                                                        |
|1\t1385853600000\t39\t0.18645109168870494\t0.13658782275823106\t0.05460092975437236\t\t11.329552259939573     

# Define schema and apply it

Now we will create the schema and apply it to the loaded data. The headers should be lowercase with hyphens, and data types will be inferred accordingly.

In [0]:
from pyspark.sql.types import StructType, StructField, IntegerType, LongType, FloatType

# Define the schema with the correct column names and types
schema = StructType([
    StructField("square-id", IntegerType(), True),
    StructField("time-interval", LongType(), True),
    StructField("country-code", IntegerType(), True),
    StructField("sms-in-activity", FloatType(), True),
    StructField("sms-out-activity", FloatType(), True),
    StructField("call-in-activity", FloatType(), True),
    StructField("call-out-activity", FloatType(), True),
    StructField("internet-traffic-activity", FloatType(), True)
])

# Read the data using the schema
df_cdr = spark.read.schema(schema).option('delimiter', '\t').csv(files, header=False)

# Display the schema and first few rows
df_cdr.printSchema()
df_cdr.show(5, truncate=False)


root
 |-- square-id: integer (nullable = true)
 |-- time-interval: long (nullable = true)
 |-- country-code: integer (nullable = true)
 |-- sms-in-activity: float (nullable = true)
 |-- sms-out-activity: float (nullable = true)
 |-- call-in-activity: float (nullable = true)
 |-- call-out-activity: float (nullable = true)
 |-- internet-traffic-activity: float (nullable = true)

+---------+-------------+------------+---------------+----------------+----------------+-----------------+-------------------------+
|square-id|time-interval|country-code|sms-in-activity|sms-out-activity|call-in-activity|call-out-activity|internet-traffic-activity|
+---------+-------------+------------+---------------+----------------+----------------+-----------------+-------------------------+
|1        |1385852400000|39          |0.11098917     |0.16621436      |0.109201856     |0.16442706       |13.6484375               |
|1        |1385852400000|46          |NULL           |NULL            |NULL            |

#  Rename Columns

This cell renames the columns in the DataFrame by replacing hyphens ('-') with underscores ('_') for better consistency in column naming.

In [0]:
# Function to rename columns
def rename_columns(df):
    for col in df.columns:
        df = df.withColumnRenamed(col, col.replace('-', '_'))
    return df

# Apply the renaming function to the CDR DataFrame
df_cdr_renamed = rename_columns(df_cdr)

# Display the new schema and first few rows to verify the renaming
df_cdr_renamed.printSchema()
df_cdr_renamed.show(5, truncate=False)


root
 |-- square_id: integer (nullable = true)
 |-- time_interval: long (nullable = true)
 |-- country_code: integer (nullable = true)
 |-- sms_in_activity: float (nullable = true)
 |-- sms_out_activity: float (nullable = true)
 |-- call_in_activity: float (nullable = true)
 |-- call_out_activity: float (nullable = true)
 |-- internet_traffic_activity: float (nullable = true)

+---------+-------------+------------+---------------+----------------+----------------+-----------------+-------------------------+
|square_id|time_interval|country_code|sms_in_activity|sms_out_activity|call_in_activity|call_out_activity|internet_traffic_activity|
+---------+-------------+------------+---------------+----------------+----------------+-----------------+-------------------------+
|1        |1385852400000|39          |0.11098917     |0.16621436      |0.109201856     |0.16442706       |13.6484375               |
|1        |1385852400000|46          |NULL           |NULL            |NULL            |

# Add SMS Ratio Column

This cell adds a new column called 'sms_ratio', which calculates the ratio of 'sms_in_activity' to 'sms_out_activity'.

In [0]:
from pyspark.sql.functions import col

# Function to add 'sms_ratio' column
def add_sms_ratio_column(df):
    return df.withColumn('sms_ratio', col('sms_in_activity') / col('sms_out_activity'))

# Apply the function to add the new column
df_cdr_with_sms_ratio = add_sms_ratio_column(df_cdr_renamed)

# Display the updated schema and first few rows to verify the new column
df_cdr_with_sms_ratio.printSchema()
df_cdr_with_sms_ratio.show(5, truncate=False)


root
 |-- square_id: integer (nullable = true)
 |-- time_interval: long (nullable = true)
 |-- country_code: integer (nullable = true)
 |-- sms_in_activity: float (nullable = true)
 |-- sms_out_activity: float (nullable = true)
 |-- call_in_activity: float (nullable = true)
 |-- call_out_activity: float (nullable = true)
 |-- internet_traffic_activity: float (nullable = true)
 |-- sms_ratio: double (nullable = true)

+---------+-------------+------------+---------------+----------------+----------------+-----------------+-------------------------+------------------+
|square_id|time_interval|country_code|sms_in_activity|sms_out_activity|call_in_activity|call_out_activity|internet_traffic_activity|sms_ratio         |
+---------+-------------+------------+---------------+----------------+----------------+-----------------+-------------------------+------------------+
|1        |1385852400000|39          |0.11098917     |0.16621436      |0.109201856     |0.16442706       |13.6484375       

# Create Date Column from Time Interval

This cell will convert the time-interval column to a timestamp and format it to yyyy/MM/dd.

In [0]:
from pyspark.sql.functions import col, from_unixtime, date_format

# Function to create a date column from time-interval
def add_date_column(dataframe):
    return dataframe.withColumn(
        "date", 
        date_format(from_unixtime(col("time_interval") / 1000), "yyyy/MM/dd")
    )

# Apply the function to add the date column
df_cdr_with_date = add_date_column(df_cdr_renamed)

# Show the updated DataFrame with the new date column
df_cdr_with_date.select("date", "time_interval", "square_id", "sms_in_activity").show(5, truncate=False)


+----------+-------------+---------+---------------+
|date      |time_interval|square_id|sms_in_activity|
+----------+-------------+---------+---------------+
|2013/11/30|1385852400000|1        |0.11098917     |
|2013/11/30|1385852400000|1        |NULL           |
|2013/11/30|1385853000000|1        |0.16513683     |
|2013/11/30|1385853600000|1        |0.029087774    |
|2013/11/30|1385853600000|1        |0.18645109     |
+----------+-------------+---------+---------------+
only showing top 5 rows



# Calculate Summary Statistics at Square ID Level

This cell computes summary statistics for each square-id, aggregating various activity metrics and counting the total records.

In [0]:
from pyspark.sql import functions as F

# Function to calculate summary statistics at the square-id level
def calculate_summary_statistics(dataframe):
    return (
        dataframe.groupBy("square_id")
        .agg(
            F.mean("sms_in_activity").alias("mean_sms_in_activity"),
            F.mean("sms_out_activity").alias("mean_sms_out_activity"),
            F.min("call_out_activity").alias("min_call_out_activity"),
            F.max("internet_traffic_activity").alias("max_internet_traffic_activity"),
            F.count("*").alias("total_records")
        )
    )

# Apply the function to calculate summary statistics
df_summary_stats = calculate_summary_statistics(df_cdr_with_date)

# Show the summary statistics DataFrame
df_summary_stats.show(truncate=False)


+---------+--------------------+---------------------+---------------------+-----------------------------+-------------+
|square_id|mean_sms_in_activity|mean_sms_out_activity|min_call_out_activity|max_internet_traffic_activity|total_records|
+---------+--------------------+---------------------+---------------------+-----------------------------+-------------+
|1088     |0.11975984345077312 |0.08407897181748984  |0.020871513          |11.037694                    |1786         |
|1238     |0.49549087842597683 |0.42196868496654066  |0.028927796          |38.695675                    |2263         |
|1342     |0.3340133734136999  |0.2921593822840496   |0.07374897           |42.553158                    |1332         |
|148      |1.5042347660848234  |1.376296321345227    |0.085676275          |128.35962                    |2179         |
|1580     |0.415568866677146   |0.254975488859302    |0.0015186457         |34.00702                     |2206         |
|1591     |1.5094633196606042  |

# Find Min and Max Values Grouped by Square ID

This cell groups the DataFrame by square-id and calculates the minimum and maximum values for specified activity columns.

In [0]:
# Function to find min and max values grouped by square-id
def find_min_max_by_square_id(dataframe):
    return (
        dataframe.groupBy("square_id")
        .agg(
            F.min("sms_in_activity").alias("min_sms_in_activity"),
            F.max("sms_in_activity").alias("max_sms_in_activity"),
            F.min("sms_out_activity").alias("min_sms_out_activity"),
            F.max("sms_out_activity").alias("max_sms_out_activity"),
            F.min("internet_traffic_activity").alias("min_internet_traffic_activity"),
            F.max("internet_traffic_activity").alias("max_internet_traffic_activity"),
            F.min("call_in_activity").alias("min_call_in_activity"),
            F.max("call_in_activity").alias("max_call_in_activity"),
            F.min("call_out_activity").alias("min_call_out_activity"),
            F.max("call_out_activity").alias("max_call_out_activity")
        )
    )

# Apply the function to find min and max values
df_min_max_stats = find_min_max_by_square_id(df_cdr_with_date)

# Show the min and max values DataFrame
df_min_max_stats.show(truncate=False)


+---------+-------------------+-------------------+--------------------+--------------------+-----------------------------+-----------------------------+--------------------+--------------------+---------------------+---------------------+
|square_id|min_sms_in_activity|max_sms_in_activity|min_sms_out_activity|max_sms_out_activity|min_internet_traffic_activity|max_internet_traffic_activity|min_call_in_activity|max_call_in_activity|min_call_out_activity|max_call_out_activity|
+---------+-------------------+-------------------+--------------------+--------------------+-----------------------------+-----------------------------+--------------------+--------------------+---------------------+---------------------+
|1088     |0.020871513        |0.54749453         |0.020871513         |2.6005592           |0.020871513                  |11.037694                    |0.020871513         |0.8147374           |0.020871513          |1.1373159            |
|1238     |0.028927796        |2.236176 

# Create a summary table

##  Generate Aggregate Table of Activities by Country and Day

This cell creates an aggregate table summarizing the daily SMS, call, and internet activities per country, excluding null values from the counts.

In [0]:
# Function to generate an aggregate table summarizing activities by country and day
def aggregate_activities_by_country_and_day(dataframe):
    return (
        dataframe.groupBy("country_code", "date")
        .agg(
            F.count(F.when(F.col("sms_in_activity").isNotNull(), True)).alias("total_sms_in_activity"),
            F.count(F.when(F.col("sms_out_activity").isNotNull(), True)).alias("total_sms_out_activity"),
            F.count(F.when(F.col("call_in_activity").isNotNull(), True)).alias("total_call_in_activity"),
            F.count(F.when(F.col("call_out_activity").isNotNull(), True)).alias("total_call_out_activity"),
            F.count(F.when(F.col("internet_traffic_activity").isNotNull(), True)).alias("total_internet_traffic_activity")
        )
    )

# Apply the function to create the aggregate table
df_activity_summary = aggregate_activities_by_country_and_day(df_cdr_with_date)

# Show the aggregate table of activities by country and day
df_activity_summary.show(truncate=False)


+------------+----------+---------------------+----------------------+----------------------+-----------------------+-------------------------------+
|country_code|date      |total_sms_in_activity|total_sms_out_activity|total_call_in_activity|total_call_out_activity|total_internet_traffic_activity|
+------------+----------+---------------------+----------------------+----------------------+-----------------------+-------------------------------+
|46          |2013/12/01|2903                 |2485                  |1057                  |1447                   |151943                         |
|976         |2013/12/01|70                   |40                    |6                     |0                      |0                              |
|503         |2013/12/01|995                  |316                   |329                   |1257                   |0                              |
|355         |2013/11/30|158                  |0                     |0                     |56     

## Write Summary Statistics DataFrame to Parquet Format

This cell writes the summary statistics DataFrame to the 'tmp' folder in Parquet format for efficient storage and access.

In [0]:
# Function to write DataFrame to Parquet format
def write_to_parquet(dataframe, path):
    dataframe.write.mode("overwrite").parquet(path)

# Specify the path to the 'tmp' folder
parquet_path = "/mnt/tmp/summary_statistics.parquet"

# Call the function to write the summary statistics DataFrame to Parquet format
write_to_parquet(df_summary_stats, parquet_path)

# Output a confirmation message
print(f"Summary statistics DataFrame has been written to {parquet_path}.")


Summary statistics DataFrame has been written to /mnt/tmp/summary_statistics.parquet.


# Retrieve AWS Credentials from Secrets Manager

In [0]:
# Accessing the AWS credentials stored in Databricks Secrets
access_key = dbutils.secrets.get(scope="my_secret_scope", key="AWS_ACCESS_KEY_ID")
secret_key = dbutils.secrets.get(scope="my_secret_scope", key="AWS_SECRET_ACCESS_KEY")


## Define Function to Set Up S3 Configuration

In [0]:
# Define Function to Set Up S3 Configuration
def configure_s3_access():
    access_key = dbutils.secrets.get(scope="my_secret_scope", key="AWS_ACCESS_KEY_ID")
    secret_key = dbutils.secrets.get(scope="my_secret_scope", key="AWS_SECRET_ACCESS_KEY")
    
    spark.conf.set("fs.s3a.access.key", access_key)
    spark.conf.set("fs.s3a.secret.key", secret_key)
    spark.conf.set("fs.s3a.endpoint", "s3.amazonaws.com")

## Define Function to Save DataFrame to S3

In [0]:
# Define Function to Save DataFrame to S3
def save_to_s3(dataframe, bucket_name, s3_path):
    s3_uri = f"s3a://{bucket_name}/{s3_path}"
    dataframe.write.mode("overwrite").parquet(s3_uri)
    print(f"File saved to {s3_uri}")

## Main Execution

In [0]:
# Main Execution
bucket_name = "crd-wcd-sunil" 
s3_path = "mnt/cdr/summary_statistics.parquet"  

# Configure S3 access
configure_s3_access()

# Load the Parquet file from the Databricks file system
df = spark.read.parquet("/mnt/tmp/summary_statistics.parquet")

# Save the DataFrame to S3
save_to_s3(df, bucket_name, s3_path)


File saved to s3a://crd-wcd-sunil/mnt/cdr/summary_statistics.parquet


## Create a dataframe rank internet activity with Window function

Based on the dataframe from the df_activity_summary, use window function, partition by coutry_code, rank the total internet activities of each day.


In [0]:
from pyspark.sql.window import Window 
from pyspark.sql.functions import rank 

wSpec3 = Window.partitionBy("country_code").orderBy("total_internet_traffic_activity")

df_activity_summary.withColumn("rank", rank().over(wSpec3)).select('country_code','date','rank').show(10)

+------------+----------+----+
|country_code|      date|rank|
+------------+----------+----+
|           0|2013/11/30|   1|
|           0|2013/12/01|   2|
|           0|2013/12/04|   3|
|           0|2013/12/03|   4|
|           0|2013/12/02|   5|
|           0|2013/12/05|   6|
|           1|2013/11/30|   1|
|           1|2013/12/01|   2|
|           1|2013/12/05|   3|
|           1|2013/12/02|   4|
+------------+----------+----+
only showing top 10 rows

