In [0]:

from pyspark.sql.functions import col
user_name = dbutils.notebook.entry_point.getDbutils().notebook().getContext().userName().get()
file_location = f'/Workspace/Users/{user_name}/data/telco.csv'
file_type = "csv"

# CSV options
infer_schema = "true"
first_row_is_header = "true"
delimiter = ","

# The applied options are for CSV files. For other file types, these will be ignored.
df = spark.read.format(file_type) \
  .option("inferSchema", infer_schema) \
  .option("header", first_row_is_header) \
  .option("sep", delimiter) \
  .load(file_location)

from pyspark.sql.functions import trim, when 
#df_tc_dbl = df.withColumn('TotalCharges', trim(col("TotalCharges")))

df_tc_dbl = df.withColumn("TotalCharges", when(col('TotalCharges')==" " , None) \
          .otherwise(col('TotalCharges')))
df_tc_dbl = df_tc_dbl.withColumn('TotalCharges', col('TotalCharges').cast('double'))

display(df_tc_dbl)
     


In [0]:
%sh
#ls -al /
#lsb_release -a

ls -al /Workspace/Users/pawel.rubach@sgh.waw.pl/data

In [0]:
df_sel = df_tc_dbl.select('Contract', 'PhoneService', 'TotalCharges', 'MonthlyCharges', 'tenure')
#display(df_sel)
df_sel.show(5)
display(df_sel)

In [0]:
df_fil = df_sel.filter((df_sel['Contract'] == 'Month-to-month') & (df_sel['PhoneService'] == 'Yes')) 
# & - AND
# | - OR
# ~ - NOT
display(df_fil)

In [0]:
df_fil = df_sel.filter(df_sel['Contract'] == 'Month-to-month') 
df_fil.show()

In [0]:
from pyspark.sql.functions import asc, desc
df_fil = df_sel.filter((df_sel['Contract'] == 'Month-to-month') | (df_sel['TotalCharges'] < 60)) 
print(type(df_sel['Contract']))

#df_fil = df_fil.sort(df_sel['Contract'].desc(), df_fil['TotalCharges'].asc())
df_fil = df_fil.sort(desc(df_sel['Contract']), asc(df_fil['TotalCharges']))

df_fil.show()



In [0]:
df_grp = df_sel.groupBy('Contract', 'PhoneService').count()
#df_grp = df_sel.groupBy('Contract', 'PhoneService').avg('TotalCharges')
df_grp.show()

In [0]:
from pyspark.sql.functions import avg, count, round

#df_grp = df_sel.groupBy('Contract', 'PhoneService').agg(avg('TotalCharges'), count('*'))
df_grp = df_sel.groupBy('Contract', 'PhoneService').agg(round(avg('TotalCharges'), 2).alias('Avg TotalCharges'), count('*').alias('Count'))
#df_grp = df_sel.groupBy('Contract', 'PhoneService').avg('TotalCharges')
df_grp.show()

In [0]:
grp_results = df_grp.collect()
print(type(grp_results))
print(grp_results)
for row in grp_results:
    row_dict = row.asDict()
    print(type(row_dict))
    print(row_dict)
    print(row_dict['Avg TotalCharges'])
    avg_tc = row_dict.get('Avg TotalCharges')
    print(type(avg_tc))
    #print(row_dict.get('Avg TotalCharges'))
  

In [0]:
# Apache Parquet - write
#df_grp.write.parquet('/Workspace/Users/pawel.rubach@sgh.waw.pl/df.parquet')

# Writing to CSV/TSV
#fileOutPath = "/Workspace/Users/pawel.rubach@sgh.waw.pl/data/telco_filtered.tsv.gz"
#df_grp.coalesce(1).write.option("header", True).option("delimiter", "\t").option("compression", "gzip").format('csv').save(fileOutPath)

pd_res = df_grp.toPandas()
pd_res.to_csv(f'/Workspace/Users/{user_name}/data/telco_grp_by.csv')

TODO: Count how many rows have equal and not equal TotalCharges and MonthlyCharges x tenure.
Show also the difference between those two values for each row and sort it to show the largest differences first

In [0]:
#display(df_sel)
#df_total = df_sel.withColumn('Total', df_sel['tenure'] * df_sel['MonthlyCharges'])
df_total = df_sel.withColumn('Total', round(col('tenure') * col('MonthlyCharges'), 2))
df_real = df_total.withColumn('Difference', round(df_total['Total'] - df_total['TotalCharges'], 2))
 
df_sort = df_real.sort(desc(df_real['Difference']))
df_sort.show()
 
df_perfect = df_sort.filter(df_sort['Difference'] == 0)
total_rows = df_sel.count()
print('Number of rows with TotalCharges exacly equal to tenure * MonthlyCharges is', df_perfect.count(), 'out of ', total_rows,'rows')
 

In [0]:
import datetime
now = datetime.datetime.now()
now.timestamp()


In [0]:
from pyspark.sql.functions import lit
df_sel_with_fixed = df_sel.withColumn('Fixed Value', lit('Fixed text'))
display(df_sel_with_fixed)

In [0]:
from pyspark.sql.functions import udf
from pyspark.sql.types import StringType, DoubleType, DatetimeType, IntegerType

@udf(returnType=StringType())
def convert_contract(contract):
    contract = contract.replace('Month-to-month', 'monthly')
    contract = contract.replace('One year', '1 year')
    contract = contract.replace('Two year', '2 year')
    return contract

df_sel_w_contract_short = df_sel.withColumn('Contract short', convert_contract('Contract'))
df_sel_w_contract_short.show()


In [0]:
contract_types = [row['Contract'] for row in df_sel.select('Contract').distinct().collect()]
print(contract_types)
base_charges_list = [30, 50, 70]
base_charges_dict = dict(zip(contract_types, base_charges_list))
print(base_charges_dict)

df_base_charges = spark.createDataFrame([(key,v) for key, v in base_charges_dict.items()], ['Contract', 'BaseCharges'])
display(df_base_charges)

In [0]:

df_base_charges_years = df_base_charges.filter(col('Contract').like('%year'))
df_base_charges_years.show()
#df_base_charges.filter(col('Contract') == "Two year"')
#df_joined = df_sel.join(df_base_charges_years, on='Contract', how='left')
#df_joined = df_sel.join(df_base_charges_years, on='Contract', how='inner')
df_joined = df_sel.join(df_base_charges_years, on='Contract', how='left_anti')
df_joined.show()
print(df_joined.count())
#df_joined_with_fixed = df_joined.withColumn('Fixed Value', lit('Fixed text'))

TODO: Check if customers with dependents pay on average more than those without dependents. The answer should be given as a single bool value (True or False)
