# About this document

See JIRA tickets:

- https://vodacom.atlassian.net/browse/BDSC1-3916
- https://vodacom.atlassian.net/browse/BDI-4928

See Confluence pages:
- https://vodacom.atlassian.net/l/cp/Z7Zxx0dT

In [1]:
source_csv_file = 'Priceincreasetelcodata_2024-01-18_12.02.50715.csv'

source_platform = 'psicle' # Specify the source platform, usually the source schema
source_subplatform = 'price_increase_data' # Specify the source subplatform, usually the source table
source = '{platform}.{subplatform}'.format(platform=source_platform, subplatform=source_subplatform)

proto_platform = 'dev_sce_analysis'
proto_subplatform = 'sm_{plat}_{subplat}'.format(plat=source_platform, subplat=source_subplatform) # Can be different, if it is, change it here

hive_platform = source_platform # Can be different, if it is, change it here
hive_subplatform = source_subplatform # Can be different, if it is, change it here

In [2]:
proto_subplatform

'sm_psicle_price_increase_data'

This spread sheet can contain numeris columns that could be exposed through the csv however to not expose all the columns, only the columns of interest have been listed below in the list of columns. This list can be changed upon the addition of new columns of interest that would be needed in this usecase

Two columns are being changed to relect the average rather than the max however the old columns will be phased out later:
- bh_max_cell_load -> bh_avg_max_cell_load
- bh_max_cell_load_forecast -> bh_avg_max_cell_load_forecast

# Initialise Spark context

In [3]:
def init_spark():
    import pyspark
    from pyspark import SparkConf, SparkContext
    from pyspark.sql import SparkSession
    import pyspark.sql.functions as F

    conf = SparkConf().setAppName('{} ingestion'.format(source))
    conf.set("spark.driver.memory","10G")
    conf.set("spark.driver.cores","4")
    conf.set("spark.driver.maxResultSize", "10G")
    conf.set("spark.executor.instances", "2")
    conf.set("spark.executor.cores", "5")
    conf.set("spark.executor.memory", '5G')
    conf.set("spark.dynamicAllocation.enabled" , "true")
    conf.set("spark.yarn.executor.memoryOverhead", "25G")
    try:
        sc = pyspark.SparkContext(conf=conf)
        spark = SparkSession(sc)
        return spark
    except ValueError: # This happens when a SparkContext already exists and is running.
        pass
    


In [4]:
spark = init_spark()

#  CSV file import

 You have to load your CSV file into the HDFS first. To do this, go to the HDFS at this URL https://pbdphu2zafsrh.vodacom.corp:8888/filebrowser/ and upload the CSV file using the button.Read up on how Spark's CSV file format works, so you can get the layout right.  Try saving an Excel worksheet that has column headers as a CSV file.  If the headers are not strictly alphanumeric, then the conversion in the below loader will remove the other characters, leaving only the alphanumeric characters.  Excepting for this conversion, you'll get a table with the same number of columns and having columns names the same as the headers.

In [5]:
import re

#load_df = spark.read.csv('{csv_file}'.format(csv_file=source_csv_file), header=True, sep='\t')
#load_df = spark.read.format("csv").option("header", "true").load(source_csv_file)
load_df = spark.read.csv('{csv_file}'.format(csv_file=source_csv_file), header=True, sep=';', inferSchema= True)

In [6]:
 load_df.printSchema()

root
 |-- msisdn0: integer (nullable = true)
 |-- vas_code: string (nullable = true)
 |-- actual_premium: string (nullable = true)
 |-- Tenure: integer (nullable = true)
 |-- 1Yr_Loss_Ratio: string (nullable = true)
 |-- 3Yr_Loss_Ratio: string (nullable = true)
 |-- 1Yr_LR_band: string (nullable = true)
 |-- 3Yr_LR_band: string (nullable = true)
 |-- LR_Increase: string (nullable = true)
 |-- TP_2: string (nullable = true)
 |-- PSR_2: string (nullable = true)
 |-- PSR_2_Increase_Loading: string (nullable = true)
 |-- Premium Increase: string (nullable = true)
 |-- New_Premium_2: string (nullable = true)
 |-- Segment: string (nullable = true)
 |-- New_Premium_PSR: string (nullable = true)
 |-- monetary increase: string (nullable = true)
 |-- PolicyMakeMapped: string (nullable = true)
 |-- SI_Band: string (nullable = true)
 |-- Count_TH_LO: integer (nullable = true)
 |-- Count_AD_Repair: integer (nullable = true)
 |-- Gendr_Cd: string (nullable = true)
 |-- TenureGroup: string (nullable 

In [7]:
for old_col in load_df.columns:
    new_col = old_col.lower().replace(" ", "_")
    load_df = load_df.withColumnRenamed(old_col, new_col)

In [8]:
load_df.printSchema()

root
 |-- msisdn0: integer (nullable = true)
 |-- vas_code: string (nullable = true)
 |-- actual_premium: string (nullable = true)
 |-- tenure: integer (nullable = true)
 |-- 1yr_loss_ratio: string (nullable = true)
 |-- 3yr_loss_ratio: string (nullable = true)
 |-- 1yr_lr_band: string (nullable = true)
 |-- 3yr_lr_band: string (nullable = true)
 |-- lr_increase: string (nullable = true)
 |-- tp_2: string (nullable = true)
 |-- psr_2: string (nullable = true)
 |-- psr_2_increase_loading: string (nullable = true)
 |-- premium_increase: string (nullable = true)
 |-- new_premium_2: string (nullable = true)
 |-- segment: string (nullable = true)
 |-- new_premium_psr: string (nullable = true)
 |-- monetary_increase: string (nullable = true)
 |-- policymakemapped: string (nullable = true)
 |-- si_band: string (nullable = true)
 |-- count_th_lo: integer (nullable = true)
 |-- count_ad_repair: integer (nullable = true)
 |-- gendr_cd: string (nullable = true)
 |-- tenuregroup: string (nullable 

In [9]:
#load_df = load_df.drop("msisdn0")
load_df = load_df.drop("msisdn43")
load_df = load_df.drop("policyvascode")
load_df = load_df.drop("policyinceptiondate2")
load_df = load_df.drop("policyinceptiondate35")
load_df = load_df.drop("thlo_acpc_expected")
load_df = load_df.drop("ad_repair_acpc_expected")
load_df = load_df.drop("ad_replace_acpc_expected")
load_df = load_df.drop("policyym")
load_df = load_df.drop("monthlypremium")
load_df = load_df.drop("age")
load_df = load_df.drop("lsm2")
load_df = load_df.drop("upgrade_date")
load_df = load_df.drop("target_date_of_increase")
load_df = load_df.drop("policymakesiband")
load_df = load_df.drop("gendr_cd")

In [10]:
load_df = load_df.withColumnRenamed("target_date_of_increase_", "target_date_of_increase")
load_df = load_df.withColumnRenamed("policymakemapped", "policy_make_mapped")
load_df = load_df.withColumnRenamed("insuredvalue", "insured_value")
load_df = load_df.withColumnRenamed("perilfilter", "peril_filter")
load_df = load_df.withColumnRenamed("tenuregroup", "tenure_group")
load_df = load_df.withColumnRenamed("mapage", "map_age")
load_df = load_df.withColumnRenamed("1yr_loss_ratio", "one_yr_loss_ratio")
load_df = load_df.withColumnRenamed("3yr_loss_ratio", "three_yr_loss_ratio")
load_df = load_df.withColumnRenamed("1yr_lr_band", "one_yr_lr_band")
load_df = load_df.withColumnRenamed("3yr_lr_band", "three_yr_lr_band")
load_df = load_df.withColumnRenamed("1yrall", "one_yr_all")
load_df = load_df.withColumnRenamed("3yrall", "three_yr_all")
load_df = load_df.withColumnRenamed("5yrall", "five_yr_all")
load_df = load_df.withColumnRenamed("1yrclaims", "one_yr_claims")
load_df = load_df.withColumnRenamed("3yrclaims", "three_yr_claims")
load_df = load_df.withColumnRenamed("5yrclaims", "five_yr_claims")
load_df = load_df.withColumnRenamed("ad_lossofsi", "ad_loss_of_si")

In [11]:
load_df.printSchema()

root
 |-- msisdn0: integer (nullable = true)
 |-- vas_code: string (nullable = true)
 |-- actual_premium: string (nullable = true)
 |-- tenure: integer (nullable = true)
 |-- one_yr_loss_ratio: string (nullable = true)
 |-- three_yr_loss_ratio: string (nullable = true)
 |-- one_yr_lr_band: string (nullable = true)
 |-- three_yr_lr_band: string (nullable = true)
 |-- lr_increase: string (nullable = true)
 |-- tp_2: string (nullable = true)
 |-- psr_2: string (nullable = true)
 |-- psr_2_increase_loading: string (nullable = true)
 |-- premium_increase: string (nullable = true)
 |-- new_premium_2: string (nullable = true)
 |-- segment: string (nullable = true)
 |-- new_premium_psr: string (nullable = true)
 |-- monetary_increase: string (nullable = true)
 |-- policy_make_mapped: string (nullable = true)
 |-- si_band: string (nullable = true)
 |-- count_th_lo: integer (nullable = true)
 |-- count_ad_repair: integer (nullable = true)
 |-- tenure_group: string (nullable = true)
 |-- most_use

#  Scheduling

- Scheduling Strategy: CRON driven

In [21]:
 load_df.show()

+--------+--------------+------+-----------------+-------------------+--------------+----------------+-----------+-----------+-----------+----------------------+----------------+-------------+-----------------+---------------+-----------------+------------------+--------------+-----------+---------------+------------+----------------+------------------------------+----------------+------------+---------------+----------------+-----------+----------+------------+-----------+-------------+---------------+--------------+---------+--------------+-------------+--------+
|vas_code|actual_premium|tenure|one_yr_loss_ratio|three_yr_loss_ratio|one_yr_lr_band|three_yr_lr_band|lr_increase|       tp_2|      psr_2|psr_2_increase_loading|premium_increase|new_premium_2|          segment|new_premium_psr|monetary_increase|policy_make_mapped|       si_band|count_th_lo|count_ad_repair|tenure_group|  most_used_prov|dmuc_24h_distnct_cell_site_qty|count_ad_replace|peril_filter|gross_ad_repair|gross_ad_replac

# Schema

In [13]:
# Converting column names
proto_df = load_df
# proto_df = proto_df.withColumnRenamed("OLT", "olt")
# proto_df = proto_df.withColumnRenamed("PM", "pm")
# proto_df = proto_df.withColumnRenamed("OLT Type", "olt_type")
# proto_df = proto_df.withColumnRenamed("Precinct Type", "precinct_type")
# proto_df = proto_df.withColumnRenamed("Lat", "lat")
# proto_df = proto_df.withColumnRenamed("Long", "long")
# proto_df = proto_df.withColumnRenamed("Atoll ID", "atoll_site_id")

In [14]:
# source_data = proto_df
# source_data.printSchema()

root
 |-- msisdn0: integer (nullable = true)
 |-- vas_code: string (nullable = true)
 |-- actual_premium: string (nullable = true)
 |-- tenure: integer (nullable = true)
 |-- one_yr_loss_ratio: string (nullable = true)
 |-- three_yr_loss_ratio: string (nullable = true)
 |-- one_yr_lr_band: string (nullable = true)
 |-- three_yr_lr_band: string (nullable = true)
 |-- lr_increase: string (nullable = true)
 |-- tp_2: string (nullable = true)
 |-- psr_2: string (nullable = true)
 |-- psr_2_increase_loading: string (nullable = true)
 |-- premium_increase: string (nullable = true)
 |-- new_premium_2: string (nullable = true)
 |-- segment: string (nullable = true)
 |-- new_premium_psr: string (nullable = true)
 |-- monetary_increase: string (nullable = true)
 |-- policy_make_mapped: string (nullable = true)
 |-- si_band: string (nullable = true)
 |-- count_th_lo: integer (nullable = true)
 |-- count_ad_repair: integer (nullable = true)
 |-- tenure_group: string (nullable = true)
 |-- most_use

In [19]:
hive_query = '''
SELECT
    msisdn,
    CASE
        WHEN subs_lsm_loc_home_lsm IS NULL THEN NULL
        WHEN floor(subs_lsm_loc_home_lsm) BETWEEN 1 AND 10 THEN 'Group' ||' '|| CAST(floor(subs_lsm_loc_home_lsm) AS STRING)
        ELSE NULL
    END AS subs_lsm_loc_home_lsm
FROM prod_feature_stores.features_history_grouping_subscriber_personal_profile
where feature_date = '20240115'
'''

In [20]:
load_df = spark.sql(hive_query)

In [21]:
load_df.show(10)

+----------+---------------------+
|    msisdn|subs_lsm_loc_home_lsm|
+----------+---------------------+
|0825515424|              Group 6|
|0764817625|              Group 5|
|0825517963|              Group 6|
|0798180401|                 null|
|0828911326|              Group 4|
|0792655478|                 null|
|0826588714|              Group 9|
|0649524605|              Group 5|
|0825519269|                 null|
|0608262402|                 null|
+----------+---------------------+
only showing top 10 rows



In [22]:
source_data = proto_df.join(load_df, proto_df["msisdn0"] == load_df["msisdn"], "left")

In [23]:
source_data.show(10)

+---------+--------+--------------+------+-----------------+-------------------+--------------+----------------+-----------+-----------+-----------+----------------------+----------------+-------------+-----------------+---------------+-----------------+------------------+--------------+-----------+---------------+------------+----------------+------------------------------+----------------+------------+---------------+----------------+-----------+----------+------------+-----------+-------------+---------------+--------------+---------+--------------+-------------+--------+----------+---------------------+
|  msisdn0|vas_code|actual_premium|tenure|one_yr_loss_ratio|three_yr_loss_ratio|one_yr_lr_band|three_yr_lr_band|lr_increase|       tp_2|      psr_2|psr_2_increase_loading|premium_increase|new_premium_2|          segment|new_premium_psr|monetary_increase|policy_make_mapped|       si_band|count_th_lo|count_ad_repair|tenure_group|  most_used_prov|dmuc_24h_distnct_cell_site_qty|count_ad

In [29]:
proto_df  = source_data.drop("msisdn")
proto_df  = proto_df.drop("msisdn0")

In [35]:
proto_df = proto_df.withColumnRenamed("subs_lsm_loc_home_lsm", "lsm")

#  Write the table to Hive

Define the destination database and table.

In [36]:
import datetime
print(datetime.datetime.now())

2024-01-18 14:21:29.204248


In [37]:
proto_df.write.format('orc').saveAsTable('{}.{}'.format(proto_platform, proto_subplatform), mode='overwrite')

In [38]:
print(datetime.datetime.now())

2024-01-18 14:22:38.110505


In [39]:
pandas_df = proto_df .toPandas()

In [40]:
pandas_df.isnull().sum()

vas_code                               0
actual_premium                         0
tenure                                 0
one_yr_loss_ratio                      0
three_yr_loss_ratio                    0
one_yr_lr_band                         0
three_yr_lr_band                       0
lr_increase                            0
tp_2                                   0
psr_2                                  0
psr_2_increase_loading                 0
premium_increase                       0
new_premium_2                          0
segment                              802
new_premium_psr                      429
monetary_increase                      0
policy_make_mapped                     0
si_band                              429
count_th_lo                            0
count_ad_repair                        0
tenure_group                           0
most_used_prov                     13680
dmuc_24h_distnct_cell_site_qty         0
count_ad_replace                       0
peril_filter    

#  User Acceptance Tests

Here define the DEV and PRD databases used by the data engineer. The table name the DE created ought to be the subplatform, which is constant between the two databases, and therefore only defined once.

In [10]:
dev_hive_platform = 'DEV_{platform}'.format(platform=hive_platform)
prd_hive_platform = 'PROD_{platform}'.format(platform=hive_platform)
test_context = 'PROD'
print(f'{dev_hive_platform}.{hive_subplatform}')

DEV_cartrack.vodacom_generator_run_times


In [16]:
non_str_cols = [t[0] for t in proto_df.dtypes if t[1] != 'string']
timestamp_cols = [t[0] for t in proto_df.dtypes if t[1] == 'timestamp']

Describe the prototype table created in Hive.

In [17]:
spark.sql('describe formatted {}.{}'.format(proto_platform, proto_subplatform)).toPandas()

Unnamed: 0,col_name,data_type,comment
0,country,string,
1,region,string,
2,siteid,string,
3,bh_max_cell_load,string,
4,bh_max_cell_load_forecast,string,
5,bh_avg_max_cell_load,string,
6,bh_avg_max_cell_load_forecast,string,
7,high_load_flag_100,string,
8,forecast_high_load_flag_100,string,
9,,,


Describe the DEV table created in Hive.

In [18]:
spark.sql('describe formatted {}.{}'.format(dev_hive_platform, hive_subplatform)).toPandas()

Unnamed: 0,col_name,data_type,comment
0,country,string,
1,region,string,
2,siteid,string,
3,bh_max_cell_load,string,
4,bh_max_cell_load_forecast,string,
5,bh_avg_max_cell_load,string,
6,bh_avg_max_cell_load_forecast,string,
7,high_load_flag_100,string,
8,forecast_high_load_flag_100,string,
9,file_name,string,


Describe the PRD table created in Hive.

In [19]:
spark.sql('describe formatted {}.{}'.format(prd_hive_platform, hive_subplatform)).toPandas()

Unnamed: 0,col_name,data_type,comment
0,country,string,
1,region,string,
2,siteid,string,
3,bh_max_cell_load,string,
4,bh_max_cell_load_forecast,string,
5,bh_avg_max_cell_load,string,
6,bh_avg_max_cell_load_forecast,string,
7,high_load_flag_100,string,
8,forecast_high_load_flag_100,string,
9,file_name,string,


#  Retrieve data from both tables for further tests

In [20]:
if test_context == 'DEV':
    hive_platform = dev_hive_platform
else:
    hive_platform = prd_hive_platform

In [11]:
hive_query = '''
SELECT

 *
 
FROM
    {platform}.{subplatform}
'''.format(platform=dev_hive_platform, subplatform=source_subplatform)

In [12]:
print(hive_query)


SELECT

 *
 
FROM
    DEV_cartrack.vodacom_generator_run_times



In [22]:
hive_data = spark.sql(hive_query)

#  Test counts of rows comparing prototype to Hive

In [23]:
source_count = proto_df.count()
print(source_count)

14314


In [24]:
hive_count = hive_data.count()
print(hive_count)

14348


In [25]:
assert(abs((source_count - hive_count)/source_count) < 0.01)

#  Test types of columns comparing prototype to Hive

In [26]:
proto_cols = set([x.name.lower() for x in source_data.schema.fields])

In [27]:
test_cols = set([x.name.lower() for x in hive_data.schema.fields if x.name.lower() not in ['lts_ingestion_validation_error', 'file_name', ]])

In [28]:
assert(list(set([x.name.lower() for x in source_data.schema.fields]) - set([x.name.lower() for x in hive_data.schema.fields])) == [])

In [29]:
proto_cols - test_cols

set()

In [30]:
#assert(list(set([x.name.lower() for x in hive_data.schema.fields]) - set([x.name.lower() for x in source_data.schema.fields])) == [])

In [31]:
hive_cols = [col.upper() for col in hive_data.columns if not col.endswith('str') and not col.startswith('bdp_lts_')]
#assert(len(set(hive_cols) - set([col.upper() for col in source_data.columns])) == 0)

# Test values of rows comparing prototype to Hive

In [32]:
source_data.select(non_str_cols).describe().toPandas() ==\
hive_data.select(hive_cols).select(non_str_cols).describe().toPandas()

Unnamed: 0,summary
0,True
1,True
2,True
3,True
4,True


In [33]:
source_data = source_data.sort('country', 'region', 'siteid', 'bh_max_cell_load')
source_data.limit(5).toPandas()

Unnamed: 0,country,region,siteid,bh_max_cell_load,bh_max_cell_load_forecast,bh_avg_max_cell_load,bh_avg_max_cell_load_forecast,high_load_flag_100,forecast_high_load_flag_100
0,ZA,CEN,CEN_1000,19.61,19.61,11.36,11.36,False,False
1,ZA,CEN,CEN_10021,77.09,69.6760940551758,33.4533333333333,33.1394414901733,False,False
2,ZA,CEN,CEN_10081,100.56,158.831924438477,72.14,130.628870646159,True,True
3,ZA,CEN,CEN_10101,14.99,16.2291660308838,14.99,16.2291660308838,False,False
4,ZA,CEN,CEN_10921,24.96,33.5579223632813,22.1033333333333,31.9808928171794,False,False


In [34]:
hive_data = hive_data.sort('country', 'region', 'siteid', 'bh_max_cell_load')
hive_data.limit(5).toPandas()

Unnamed: 0,country,region,siteid,bh_max_cell_load,bh_max_cell_load_forecast,bh_avg_max_cell_load,bh_avg_max_cell_load_forecast,high_load_flag_100,forecast_high_load_flag_100,file_name,lts_ingestion_validation_error,bdp_lts_insert_timestamp
0,ZA,CEN,,,,,,False,False,ZA_RadioForecast2GSummary.csv,0,20220729145519924812
1,ZA,CEN,CEN_1000,17.81,33.0430297851563,10.48,17.2617216110229,False,False,ZA_RadioForecast2GSummary.csv,0,20220729145519924812
2,ZA,CEN,CEN_10021,21.79,65.2729110717773,14.85,32.3673521677653,False,False,ZA_RadioForecast2GSummary.csv,0,20220729145519924812
3,ZA,CEN,CEN_10081,105.86,149.93049621582,76.97,112.012232462565,True,True,ZA_RadioForecast2GSummary.csv,0,20220729145519924812
4,ZA,CEN,CEN_10101,39.46,39.4599990844727,39.46,39.4599990844727,False,False,ZA_RadioForecast2GSummary.csv,0,20220729145519924812


In [35]:
diff_data = source_data.join(hive_data, [source_data.siteid == hive_data.siteid,
                                         source_data.country == hive_data.country,
                                         source_data.region == hive_data.region,
                                         source_data.bh_max_cell_load != hive_data.bh_max_cell_load], 
                                         how = 'inner' )
diff_data.toPandas()

Unnamed: 0,country,region,siteid,bh_max_cell_load,bh_max_cell_load_forecast,bh_avg_max_cell_load,bh_avg_max_cell_load_forecast,high_load_flag_100,forecast_high_load_flag_100,country.1,...,siteid.1,bh_max_cell_load.1,bh_max_cell_load_forecast.1,bh_avg_max_cell_load.1,bh_avg_max_cell_load_forecast.1,high_load_flag_100.1,forecast_high_load_flag_100.1,file_name,lts_ingestion_validation_error,bdp_lts_insert_timestamp
0,ZA,CEN,CEN_1000,19.61,19.61,11.36,11.36,False,False,ZA,...,CEN_1000,17.81,33.0430297851563,10.48,17.2617216110229,False,False,ZA_RadioForecast2GSummary.csv,0,20220729145519924812
1,ZA,CEN,CEN_10021,77.09,69.6760940551758,33.4533333333333,33.1394414901733,False,False,ZA,...,CEN_10021,21.79,65.2729110717773,14.85,32.3673521677653,False,False,ZA_RadioForecast2GSummary.csv,0,20220729145519924812
2,ZA,CEN,CEN_10081,100.56,158.831924438477,72.14,130.628870646159,True,True,ZA,...,CEN_10081,105.86,149.93049621582,76.97,112.012232462565,True,True,ZA_RadioForecast2GSummary.csv,0,20220729145519924812
3,ZA,CEN,CEN_10101,14.99,16.2291660308838,14.99,16.2291660308838,False,False,ZA,...,CEN_10101,39.46,39.4599990844727,39.46,39.4599990844727,False,False,ZA_RadioForecast2GSummary.csv,0,20220729145519924812
4,ZA,CEN,CEN_10921,24.96,33.5579223632813,22.1033333333333,31.9808928171794,False,False,ZA,...,CEN_10921,24.52,39.5194549560547,21.5233333333333,37.1372133890788,False,False,ZA_RadioForecast2GSummary.csv,0,20220729145519924812
...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...
14117,ZA,WES,WES_7641,30.67,63.4095687866211,17.415,33.9410343170166,False,False,ZA,...,WES_7641,18.75,70.775032043457,11.44,38.7100160121918,False,False,ZA_RadioForecast2GSummary.csv,0,20220729145519924812
14118,ZA,WES,WES_7650,92.16,140.236801147461,71.59,107.837645212809,False,True,ZA,...,WES_7650,55.52,143.395141601563,38.0566666666667,101.038122812907,False,True,ZA_RadioForecast2GSummary.csv,0,20220729145519924812
14119,ZA,WES,WES_7661,89.47,107.31462097168,44.83,60.7785898844401,False,True,ZA,...,WES_7661,54.74,110.77587890625,29.8966666666667,60.4599316914876,False,True,ZA_RadioForecast2GSummary.csv,0,20220729145519924812
14120,ZA,WES,WES_9201,54.18,75.1497344970703,22.6,30.1267846425374,False,False,ZA,...,WES_9201,54.03,76.8578491210938,21.4733333333333,31.4332583745321,False,False,ZA_RadioForecast2GSummary.csv,0,20220729145519924812


In [36]:
diff_data.count()

14122

In [27]:
spark.stop()