In [0]:
# Only tech_list should be maintained
from datetime import datetime, timedelta
yesterday = datetime.now() - timedelta(days=1)
datekey = yesterday.strftime('%Y-%m-%d')

# datekey = '2018-10-24'
year, month, day = datekey.split('-')

tech_list = [
    ('gsm', '2g', '900'),
    ('gsm', '2g', '1800'),
    ('umts', '3g', '900'),
    ('umts', '3g', '2100'),
    
    ('lte', '4g', '700'),
    ('lte', '4g', '800'),
    ('lte', '4g', '1800'),
    ('lte', '4g', '2100'),
    ('lte', '4g', '2600')
]


In [0]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, when, lit

spark = SparkSession.builder.appName("MartechAssignment").getOrCreate()


df_tech_dimension = spark.createDataFrame([
    (row[0], row[1], row[2], 'site_' + row[1] + '_cnt', 'frequency_band_' + row[0][0].upper() + row[2] ) for row in tech_list
], ["tech_type", "network_type", "frequency", "cell_cnt_value", "frequency_band_value"])


In [0]:
df_site = spark.read \
        .format("csv") \
        .option("header", "true") \
        .option("delimiter", ";") \
        .option("inferSchema", "true") \
        .load("gs://martech-archive-data/Archive/site/year={year}/month={mon}/day={day}/*.csv".format(year=year, mon=month, day=day))
    


In [0]:
from pyspark.sql.types import StructType, StructField, StringType, IntegerType

# Define the schema
schema = StructType([
    StructField("year", IntegerType(), True),
    StructField("month", IntegerType(), True),
    StructField("day", IntegerType(), True),
    StructField("cell_identity", StringType(), True),
    StructField("frequency_band", StringType(), True),
    StructField("site_id", StringType(), True),
    StructField("tech_type", StringType(), True)
])

# Create an empty DataFrame with the defined schema
df_data = spark.createDataFrame(spark.sparkContext.emptyRDD(), schema)


# Merge all the data into a dataframe
tech_types = set(item[0] for item in tech_list)
for tech_type in tech_types:
    path = "gs://martech-archive-data/Archive/{type}/year={year}/month={mon}/day={day}/*.csv".format(type=tech_type, year=year, mon=month, day=day)
   
    try:
        df_tech_type = spark.read \
            .format("csv") \
            .option("header", "true") \
            .option("delimiter", ";") \
            .option("inferSchema", "true") \
            .load(path)
    except Exception as e:
        continue

    df_tech_type = df_tech_type.withColumn(tech_type, lit(tech_type))
    df_data = df_data.union(df_tech_type)
        

In [0]:
df_data.createOrReplaceTempView("martech_archive_data")
df_site.createOrReplaceTempView("martech_archive_site")
df_tech_dimension.createOrReplaceTempView("dim_tech_info")

In [0]:
# Generate a dynamic sql to get the final cell result

dynamic_tech_cell_cnt = [row[0] for row in df_tech_dimension.select('cell_cnt_value').distinct().collect()]


# Dynamically generate the COALESCE and PIVOT clauses
coalesce_clauses = ",\n       ".join([f"COALESCE(SUM(aa.{col}), 0) AS {col}" for col in dynamic_tech_cell_cnt])
pivot_values = ", ".join([f"'{col}'" for col in dynamic_tech_cell_cnt])

dynamic_tech_cell_cnt_sql = f"""
SELECT bb.year, bb.month, bb.day, bb.site_id,
       {coalesce_clauses}
FROM (
  SELECT *
  FROM (
      SELECT year, month, day, tech_type, site_id,
            COUNT(DISTINCT cell_identity) AS cell_cnt
      FROM martech_archive_data
      GROUP BY year, month, day, tech_type, site_id
  )  a 
  JOIN (
      SELECT tech_type, cell_cnt_value
        FROM dim_tech_info
       GROUP BY tech_type, cell_cnt_value
  )  b ON a.tech_type = b.tech_type
  PIVOT (
    SUM(cell_cnt) AS cell_cnt
    FOR cell_cnt_value IN ({pivot_values}) 
  )
) aa RIGHT JOIN martech_archive_site bb ON aa.site_id = bb.site_id
GROUP BY bb.year, bb.month, bb.day, bb.site_id
"""

df_tech_cell_cnt = spark.sql(dynamic_tech_cell_cnt_sql)

In [0]:
# Generate a dynamic sql to get the final frequency result

dynamic_frequency_band = [row[0] for row in df_tech_dimension.select('frequency_band_value').distinct().collect()]

# Generate MAX(COALESCE(...)) clauses for the SELECT statement
max_coalesce_clauses = ",\n       ".join([f"MAX(COALESCE({col}, 0)) AS {col}" for col in dynamic_frequency_band])

# Generate PIVOT clause values
pivot_values = ", ".join([f"'{col}'" for col in dynamic_frequency_band])

dynamic_frequency_band_sql = f"""
-- frequency_band 
SELECT bb.year, bb.month, bb.day, bb.site_id,
       -- frequency_band
       {max_coalesce_clauses}
FROM (
  SELECT *
    FROM (
        SELECT year, month, day, tech_type, site_id, frequency_band
        FROM martech_archive_data
    )  a 
    JOIN (
        SELECT tech_type, frequency, frequency_band_value
          FROM dim_tech_info
        GROUP BY tech_type, frequency, frequency_band_value
    )  b ON a.tech_type = b.tech_type AND a.frequency_band = b.frequency
    PIVOT (
      MAX(IF(frequency_band IS NOT NULL, 1, 0)) AS frenquency_is_exist
      FOR frequency_band_value IN ({pivot_values}) 
    )
) aa RIGHT JOIN martech_archive_site bb ON aa.site_id = bb.site_id
GROUP BY bb.year, bb.month, bb.day, bb.site_id
"""

df_frequency_band_cnt =spark.sql(dynamic_frequency_band_sql)


In [0]:
# full join the two dataframes, then sink the data into google cloud storage
join_conditions = ["year", "month", "day", "site_id"]
result_df = df_tech_cell_cnt.join(df_frequency_band_cnt, join_conditions, "fullouter")

gcs_output_path = "gs://martech-archive-data/Result/year={year}/month={mon}/day={day}/result.csv".format(year=year, mon=month, day=day)

result_df.write.format('csv').mode('overwrite').option("header", True).save(gcs_output_path)