In [1]:
import re
from datetime import datetime, timedelta

def __check_input(start_date, end_date, days, hours):   # Actually does more than just checking input, it also transforms it. Perhaps find a better name?
    format = "%Y-%m-%d"

    # Handle input for "start_date" and "end_date" 
    try:
        start_date = datetime.strptime(start_date, format)
    except ValueError as ve:
        raise ValueError('Invalid start date: {0}. Make sure it follows the expected format: yyyy-mm-dd and uses actual calender dates.'.format(start_date)) from ve

    try:
        end_date = datetime.strptime(end_date, format)
    except ValueError as ve:
        raise ValueError('Invalid end date: {0}. Make sure it follows the expected format: yyyy-mm-dd and uses actual calender dates.'.format(end_date)) from ve

    if end_date < start_date:
        raise ValueError('End date {0} is set earlier than start date {1}'.format(end_date.strftime("%Y-%m-%d"), start_date.strftime("%Y-%m-%d")))



    # Handle input for "days"
    days_pattern = re.compile(r'^[a-zA-Z]{3}(, ?[a-zA-Z]{3})*$')
    if not days_pattern.match(days):
        raise ValueError('The input string for days: "{0}" is not in a valid format. Please provide as a string of comma separated days. Example: "mon, tue, wed"'.format(days))

    days_list = [day.strip().lower() for day in days.split(',')]
    valid_days = ["mon", "tue", "wed", "thu", "fri", "sat", "sun", "all"]

    for day in days_list:
        if day not in valid_days:
            raise ValueError('The given day "{0}" is not a valid day. Please use the standard three letter abbreviations: mon, tue, wed, thu, fri, sat, sun, or all.'.format(day, days))

    if "all" in days_list and len(days_list) > 1: 
        raise ValueError('Use of "all" together with other specified days: {0}. Option "all" should be used alone.'.format(days))

    day_map = {day: i for i, day in enumerate(valid_days)}
    
    if "all" in days_list:
        days_list = [day_map[day] for day in valid_days]
    else:
        days_list = [day_map[day] for day in days_list]



    # Handle input for "hours"
    hours_pattern = re.compile(r'^\d{2}-\d{2}(, ?\d{2}-\d{2})*$')
    if not hours_pattern.match(hours):
        raise ValueError('The input string for hours: "{0}" is not in a valid format. Please provide as a string of comma separated ranges. Example: "06-09, 13-15"'.format(hours))

    hours_list = []
    valid_hours = [str(i).zfill(2) for i in range(0,25)]

    for h in [h.strip() for h in hours.split(',')]:
        start, end = h.split('-')
        if end <= start: 
            raise ValueError('Wrong order of hours: "{0}" The first hour in any range should be strictly smaller than the second'.format(h))
        if start not in valid_hours or end not in valid_hours:
            raise ValueError('Make sure both start "{0}" and end "{1}" are valid hours of the day.'.format(start, end))
        hours_list += [str(day).zfill(2) for day in range(int(start), int(end))]
    
    hours_list.sort()

    if len(hours_list) != len(set(hours_list)):
        raise ValueError("Make sure that the ranges in hours: {0} do not overlap".format(hours))

    
    return start_date, end_date, days_list, hours_list

In [2]:
# https://stackoverflow.com/questions/74113035/fastest-way-to-search-many-files-in-many-directories
from concurrent.futures import ThreadPoolExecutor

def __process_directories(search_paths, files):

    search_paths = [path for path in search_paths if os.path.isdir(path)]

    def process_directory(dir):
        output = []
        for file in os.listdir(dir):
            if file in files:
                output.append(os.path.join(dir, file))
        return output

    result = []

    with ThreadPoolExecutor() as executor:
        for rv in executor.map(process_directory, search_paths):
            result.extend(rv)

    return result

In [3]:
import os
import re

def __findData(start_date, end_date, days, hours):
    mind_path = "/mnt/mind-data"
    paths = []

    start_date, end_date, days, hours = __check_input(start_date, end_date, days, hours)

    if days == "all":
        dates = [date for date in [start_date + timedelta(days=i) for i in range((end_date - start_date).days + 1)]]
    else:
        dates = [date for date in [start_date + timedelta(days=i) for i in range((end_date - start_date).days + 1)] if date.weekday() in days]
    
    for date in dates:
        search_directories = [os.path.join(mind_path, datetime.strftime(date + timedelta(days=i), "%y%m%d")) for i in range(10)]

        files = ['DW_{0}_{1}.dat.gz'.format(datetime.strftime(date, "%Y%m%d"), hour) for hour in hours]

        result = __process_directories(search_directories, files)

        paths.extend(result)

    return paths

In [4]:
from pyspark.sql import SparkSession
from delta import configure_spark_with_delta_pip

def createSparkSession(appName = "calista-analytics", executor_instances = "20", executor_cores = "3", executor_memory = "12g", master = "spark://n1:7077"):

    # Add check to see if cluster is up and running with suggestions for ways to start it up

    # Add check to see if another application is running and using up resources with suggestions for ways to turn it off

    builder = SparkSession.builder.appName(appName) \
    .config("spark.executor.instances", executor_instances).config("spark.executor.cores", executor_cores).config("spark.executor.memory", executor_memory) \
    .config("spark.sql.extensions", "io.delta.sql.DeltaSparkSessionExtension") \
    .config("spark.sql.catalog.spark_catalog", "org.apache.spark.sql.delta.catalog.DeltaCatalog") \
    .master(master)

    spark = configure_spark_with_delta_pip(builder).getOrCreate()

    return spark

In [5]:
spark = createSparkSession()

:: loading settings :: url = jar:file:/home/xadmin/.local/lib/python3.10/site-packages/pyspark/jars/ivy-2.5.1.jar!/org/apache/ivy/core/settings/ivysettings.xml


Ivy Default Cache set to: /home/xadmin/.ivy2/cache
The jars for the packages stored in: /home/xadmin/.ivy2/jars
io.delta#delta-core_2.12 added as a dependency
:: resolving dependencies :: org.apache.spark#spark-submit-parent-2e2b75bf-9630-4195-8dec-a192e51259f0;1.0
	confs: [default]
	found io.delta#delta-core_2.12;2.3.0 in central
	found io.delta#delta-storage;2.3.0 in central
	found org.antlr#antlr4-runtime;4.8 in central
:: resolution report :: resolve 131ms :: artifacts dl 5ms
	:: modules in use:
	io.delta#delta-core_2.12;2.3.0 from central in [default]
	io.delta#delta-storage;2.3.0 from central in [default]
	org.antlr#antlr4-runtime;4.8 from central in [default]
	---------------------------------------------------------------------
	|                  |            modules            ||   artifacts   |
	|       conf       | number| search|dwnlded|evicted|| number|dwnlded|
	---------------------------------------------------------------------
	|      default     |   3   |   0   |   0

23/04/25 17:26:31 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).


23/04/25 17:26:31 WARN Utils: Service 'SparkUI' could not bind on port 4040. Attempting port 4041.


In [9]:
    from pyspark.sql.types import StructType, StructField, IntegerType, StringType
    
    def __getSchema():
        headers = ["USAGE_DTTM", "SCRAMBLED_IMSI", "Download_Type", "MCC", "MNC", "LAC", "SAC", "TAC",
                    "E_NODE_B_ID", "SECTOR_ID", "RAT", "NEW_CALL_ATTEMPTS", "NEW_ANSWERED_CALLS",
                    "NEW_FAILED_CALLS", "NEW_DROPPED_CALLS", "NEW_PS_ATTEMPTS", "NEW_FAILED_PS", "NEW_DROPPED_PS",
                    "NEW_SMS_ATTEMPTS", "NEW_FAILED_SMS", "NEW_LU_ATTEMPTS", "NEW_FAILED_LU", "NEW_RAUTAU_ATTEMPTS",
                    "NEW_FAILED_RAUTAU", "NEW_ATTACH_ATTEMPTS", "NEW_FAILED_ATTACH", "NEW_DETACH_ATTEMPTS",
                    "NEW_DL_VOLUME", "NEW_DL_MAX_THROUGHPUT", "NEW_UL_VOLUME", "NEW_UL_MAX_THROUGHPUT"]

        fields = [StructField(field_name, StringType(), True) 
                    if field_name in ["USAGE_DTTM", "SCRAMBLED_IMSI", "Download_Type"] 
                    else StructField(field_name, IntegerType(), True) 
                    for field_name in headers]

        schema = StructType(fields)

        return schema

In [10]:
from pyspark.sql.functions import *
from tqdm import tqdm

def __ingest(files, batch_size=10):
    batches = [files[i:i+batch_size] for i in range(0, len(files), batch_size)]
    schema = __getSchema()
    dfs = []

    for batch in tqdm(batches, desc="Processing batches"):
        df = spark.read.csv(batch, schema=schema, sep='\t') \
        .withColumn("timestamp", to_timestamp("USAGE_DTTM", "ddMMMyyy:HH:mm:ss")) \
        .withColumn("date", date_format("timestamp", "yyyy-MM-dd")) \
        .withColumn("time", date_format("timestamp", "HH:mm")) \
        .select("date","time","SCRAMBLED_IMSI", "Download_Type", "MCC", "MNC", "LAC", "SAC", "TAC",
                    "E_NODE_B_ID", "SECTOR_ID", "RAT", "NEW_CALL_ATTEMPTS", "NEW_ANSWERED_CALLS",
                    "NEW_FAILED_CALLS", "NEW_DROPPED_CALLS", "NEW_PS_ATTEMPTS", "NEW_FAILED_PS", "NEW_DROPPED_PS",
                    "NEW_SMS_ATTEMPTS", "NEW_FAILED_SMS", "NEW_LU_ATTEMPTS", "NEW_FAILED_LU", "NEW_RAUTAU_ATTEMPTS",
                    "NEW_FAILED_RAUTAU", "NEW_ATTACH_ATTEMPTS", "NEW_FAILED_ATTACH", "NEW_DETACH_ATTEMPTS",
                    "NEW_DL_VOLUME", "NEW_DL_MAX_THROUGHPUT", "NEW_UL_VOLUME", "NEW_UL_MAX_THROUGHPUT")
        


        dfs.append(df)

    return dfs

In [11]:
# Runs slow first time but after data caching findData takes only seconds

import cProfile
import time
#cProfile.run('__findData("2022-05-01", "2022-12-01", "mon", "09-12, 13-15")')

files = __findData("2022-01-01", "2022-12-01", "wed", "00-24")

df = __ingest(files)



Processing batches: 100%|██████████| 113/113 [00:07<00:00, 15.53it/s]


In [266]:
df[1].count()

                                                                                

551354102