In [1]:
import numpy as np
import pandas as pd
import pyspark

In [2]:
import os
os.environ["HADOOP_USER_NAME"] = "hdfs"
os.environ["PYTHON_VERSION"] = "3.10.7"

In [3]:
# Load Spark libraries

import pyspark

from pyspark import SparkContext
from pyspark.sql.session import SparkSession
from pyspark.sql.functions import *


In [4]:
# Print all outputs in a block - not just the last one

from IPython.core.interactiveshell import InteractiveShell
InteractiveShell.ast_node_interactivity = "all"

In [5]:
from pyspark.sql import SparkSession
spark = SparkSession \
        .builder \
        .appName("PythonSpark") \
        .config("hive.metastore.uris",
                "thrift://hive-metastore:9083") \
        .config("spark.sql.warehouse.dir",
                "http://namenode:50070/user/hive/warehouse") \
        .enableHiveSupport() \
        .getOrCreate()

print(spark.version)

3.3.1


In [6]:
spark

In [7]:
#connecting to Hive data
bill_records = spark.read \
               .option("header", "true") \
               .option("inferSchema",  "true") \
               .csv("hdfs://namenode:8020/examples/telco/data")

In [8]:
bill_records.printSchema()

root
 |-- Month: string (nullable = true)
 |-- Name: string (nullable = true)
 |-- Address: string (nullable = true)
 |-- Date_of_Birth: timestamp (nullable = true)
 |-- Service_Provider: string (nullable = true)
 |-- Account_Type: string (nullable = true)
 |-- Billed_Amount: double (nullable = true)
 |-- Paid_Amount: double (nullable = true)



In [9]:
#print sample records
bill_records.take(5)

[Row(Month='July', Name='YFNIQCDTQW', Address='52Delhi', Date_of_Birth=datetime.datetime(1989, 9, 24, 0, 0), Service_Provider='Airtel', Account_Type='P', Billed_Amount=6155.98, Paid_Amount=2701.24),
 Row(Month='March', Name='QOEORMGDAT', Address='45Kolkata', Date_of_Birth=datetime.datetime(1995, 7, 7, 0, 0), Service_Provider='Idea', Account_Type='C', Billed_Amount=5539.41, Paid_Amount=3110.41),
 Row(Month='October', Name='XEOFGPIYZC', Address='80Chennai', Date_of_Birth=datetime.datetime(1957, 8, 23, 0, 0), Service_Provider='Vodafone', Account_Type='P', Billed_Amount=2844.71, Paid_Amount=2148.03),
 Row(Month='November', Name='MKHAZJDSLP', Address='10Delhi', Date_of_Birth=datetime.datetime(1994, 12, 15, 0, 0), Service_Provider='Jio', Account_Type='C', Billed_Amount=5254.45, Paid_Amount=5187.47),
 Row(Month='November', Name='AVJUQBHRZY', Address='19Delhi', Date_of_Birth=datetime.datetime(1980, 7, 28, 0, 0), Service_Provider='Airtel', Account_Type='C', Billed_Amount=3776.09, Paid_Amount=21

In [None]:
bill_records.count()

200

In [None]:
bill_records.na.drop().count()

200

In [None]:
#printed in more readale format
spark.createDataFrame(bill_records.take(500)).toPandas()
bill_records

  series = series.astype(t, copy=False)


Unnamed: 0,Month,Name,Address,Date_of_Birth,Service_Provider,Account_Type,Billed_Amount,Paid_Amount
0,July,YFNIQCDTQW,52Delhi,1989-09-24,Airtel,P,6155.98,2701.24
1,March,QOEORMGDAT,45Kolkata,1995-07-07,Idea,C,5539.41,3110.41
2,October,XEOFGPIYZC,80Chennai,1957-08-23,Vodafone,P,2844.71,2148.03
3,November,MKHAZJDSLP,10Delhi,1994-12-15,Jio,C,5254.45,5187.47
4,November,AVJUQBHRZY,19Delhi,1980-07-28,Airtel,C,3776.09,2146.34
...,...,...,...,...,...,...,...,...
195,October,GSHQJPUCJH,22Mumbai,1963-10-01,Jio,C,3614.78,2653.31
196,June,NNIHNAZXLC,14Kolkata,1981-06-15,Airtel,P,9020.82,6684.20
197,August,KPGVYVIEFE,65Mumbai,1961-02-06,Idea,P,1275.42,1246.62
198,November,PQNNEILFYM,27Chennai,1974-10-23,Airtel,P,6482.77,3405.82


DataFrame[Month: string, Name: string, Address: string, Date_of_Birth: timestamp, Service_Provider: string, Account_Type: string, Billed_Amount: double, Paid_Amount: double]

In [None]:
from pyspark.sql.types import *
#create structure for the data
schema = StructType([\
    StructField("Month", StringType(), True),\
    StructField("Name", StringType(), True),\
    StructField("Address",  StringType(), True),
    StructField("Date_of_Birth",  DateType(), True),
    StructField("Service_Provider", StringType(), True),
    StructField("Account_Type",  StringType(), True),                     
    StructField("Billed_Amount",  FloatType(), True),
    StructField("Paid_Amount",  FloatType(), True)])


In [None]:
#readind data from the telco file on hive cluster
telco_df = spark.read \
                    .option("header", "true") \
                    .schema(schema) \
                    .csv("hdfs://namenode:8020/examples/telco/data", sep=r",")

In [None]:
telco_df.printSchema()

root
 |-- Month: string (nullable = true)
 |-- Name: string (nullable = true)
 |-- Address: string (nullable = true)
 |-- Date_of_Birth: date (nullable = true)
 |-- Service_Provider: string (nullable = true)
 |-- Account_Type: string (nullable = true)
 |-- Billed_Amount: float (nullable = true)
 |-- Paid_Amount: float (nullable = true)



In [None]:
telco_df.take(5)

[Row(Month='July', Name='YFNIQCDTQW', Address='52Delhi', Date_of_Birth=datetime.date(1989, 9, 24), Service_Provider='Airtel', Account_Type='P', Billed_Amount=6155.97998046875, Paid_Amount=2701.239990234375),
 Row(Month='March', Name='QOEORMGDAT', Address='45Kolkata', Date_of_Birth=datetime.date(1995, 7, 7), Service_Provider='Idea', Account_Type='C', Billed_Amount=5539.41015625, Paid_Amount=3110.409912109375),
 Row(Month='October', Name='XEOFGPIYZC', Address='80Chennai', Date_of_Birth=datetime.date(1957, 8, 23), Service_Provider='Vodafone', Account_Type='P', Billed_Amount=2844.7099609375, Paid_Amount=2148.030029296875),
 Row(Month='November', Name='MKHAZJDSLP', Address='10Delhi', Date_of_Birth=datetime.date(1994, 12, 15), Service_Provider='Jio', Account_Type='C', Billed_Amount=5254.4501953125, Paid_Amount=5187.47021484375),
 Row(Month='November', Name='AVJUQBHRZY', Address='19Delhi', Date_of_Birth=datetime.date(1980, 7, 28), Service_Provider='Airtel', Account_Type='C', Billed_Amount=377

In [None]:
telco_df_p = spark.createDataFrame(telco_df.take(500)).toPandas()

In [None]:
telco_df_p

Unnamed: 0,Month,Name,Address,Date_of_Birth,Service_Provider,Account_Type,Billed_Amount,Paid_Amount
0,July,YFNIQCDTQW,52Delhi,1989-09-24,Airtel,P,6155.979980,2701.239990
1,March,QOEORMGDAT,45Kolkata,1995-07-07,Idea,C,5539.410156,3110.409912
2,October,XEOFGPIYZC,80Chennai,1957-08-23,Vodafone,P,2844.709961,2148.030029
3,November,MKHAZJDSLP,10Delhi,1994-12-15,Jio,C,5254.450195,5187.470215
4,November,AVJUQBHRZY,19Delhi,1980-07-28,Airtel,C,3776.090088,2146.340088
...,...,...,...,...,...,...,...,...
195,October,GSHQJPUCJH,22Mumbai,1963-10-01,Jio,C,3614.780029,2653.310059
196,June,NNIHNAZXLC,14Kolkata,1981-06-15,Airtel,P,9020.820312,6684.200195
197,August,KPGVYVIEFE,65Mumbai,1961-02-06,Idea,P,1275.420044,1246.619995
198,November,PQNNEILFYM,27Chennai,1974-10-23,Airtel,P,6482.770020,3405.820068


In [353]:
#a.	Market share of the different telephone service providers in Bangalore

telco_df_blore = telco_df.filter(col('Address').contains('Bangalore'))
telco_df_blore.show()

# calculating total billed amount by subscriber base
df_AggSales = telco_df_blore.groupBy('Service_Provider') \
        .agg(sum('Billed_Amount') 
        .cast('decimal(38,2)') \
        .alias('Total_Billed_Amount')) \

print(df_AggSales.show())




+---------+----------+-----------+-------------+----------------+------------+-------------+-----------+
|    Month|      Name|    Address|Date_of_Birth|Service_Provider|Account_Type|Billed_Amount|Paid_Amount|
+---------+----------+-----------+-------------+----------------+------------+-------------+-----------+
|     July|BBXYGBWKNX|46Bangalore|   1989-10-07|        Vodafone|           P|      6908.64|     5558.1|
|     July|XMHUSSUONO|67Bangalore|   1958-06-18|            BSNL|           C|      2784.77|    2063.64|
|  October|LEYRLTHOLB|41Bangalore|   1973-04-17|        Vodafone|           P|      1347.67|    1091.02|
|  January|DSOFUUEPXM|30Bangalore|   1980-11-21|            Idea|           C|      5047.67|    4863.85|
|  January|SYXBWLHZXM| 2Bangalore|   1973-10-02|            BSNL|           P|      6709.65|    4341.03|
|      May|BBZLJVXCWG|13Bangalore|   1977-06-21|          Airtel|           P|      9833.37|    2268.53|
|     June|LZDQYUAPXL|34Bangalore|   1976-06-15|       

In [355]:
#calculate total billed amount for all Subscriber

telco_df_p_b = telco_df_p[telco_df_p.Address.str.contains('Bangalore')]
#print(telco_df_p_b.head(2))
telco_df_p_total =telco_df_p_b.groupby(['Service_Provider']).agg({'Billed_Amount': ['sum']}).rename(columns={'sum':'Total_Billed_Amount'}).round(2)
telco_df_p_total

Unnamed: 0_level_0,Billed_Amount
Unnamed: 0_level_1,Total_Billed_Amount
Service_Provider,Unnamed: 1_level_2
Airtel,28899.14
BSNL,40187.83
Idea,85546.73
Jio,13657.9
Vodafone,28039.92


In [358]:
import pyspark.sql.functions as f
from pyspark.sql.window import Window

# Calculate percentage share for earch subscriber by total share

total = telco_df_p_total["Billed_Amount"].sum()
telco_df_p_total["PercentageShare"] = (telco_df_p_total["Billed_Amount"]/total*100).round(2)

print("\n Market share of the different telephone service providers in Bangalore\n")
telco_df_p_total.head()



 Market share of the different telephone service providers in Bangalore



Unnamed: 0_level_0,Billed_Amount,PercentageShare
Unnamed: 0_level_1,Total_Billed_Amount,Unnamed: 2_level_1
Service_Provider,Unnamed: 1_level_2,Unnamed: 2_level_2
Airtel,28899.14,14.72
BSNL,40187.83,20.47
Idea,85546.73,43.57
Jio,13657.9,6.96
Vodafone,28039.92,14.28


In [35]:
#b.	Average age of the subscriber, aggregated by the service provider

telco_age = telco_df
# add column Age
telco_age.withColumn('Age', \
         floor(datediff(current_date(), to_date(col('Date_of_Birth'), 'M/d/yyyy'))/365.25)) \
         
print('\nAverage Age of Subsciber by Service Provider is: \n')

telco_age.groupBy('Service_Provider') \
        .agg(avg(floor(datediff(current_date(), to_date(col('Date_of_Birth'), 'M/d/yyyy'))/365.25)) 
        .cast('decimal(38,2)') \
        .alias('Average_Age_Subscriber'))\
        .show(truncate=False)

DataFrame[Month: string, Name: string, Address: string, Date_of_Birth: date, Service_Provider: string, Account_Type: string, Billed_Amount: float, Paid_Amount: float, Age: bigint]


Average Age of Subsciber by Service Provider is: 

+----------------+----------------------+
|Service_Provider|Average_Age_Subscriber|
+----------------+----------------------+
|Vodafone        |47.71                 |
|Idea            |45.63                 |
|Airtel          |48.53                 |
|Jio             |46.55                 |
|BSNL            |46.34                 |
+----------------+----------------------+



In [360]:
#c.	Area-wise subscriber base for each provider

from pyspark.sql.functions import col, when
# Adding column area by city
telco_df_area = telco_df.withColumn('Area', 
                                              when(col('Address').like('%Delhi'), 'PunjabiBagh') 
                                              .when(col('Address').like('%Kolkata'), 'Science City') 
                                              .when(col('Address').like('%Chennai'), 'Anna Nagar') 
                                              .when(col('Address').like('%Mumbai'), 'Juhu') 
                                              .when(col('Address').like('%Bangalore'), 'Bannerghatta') 
                                             )

#print(type(telco_df_area))
#telco_df_area_df1 = spark.createDataFrame(telco_df_area.take(500)).toPandas()
#print(telco_df_area_df1.head(5))


df_AggSales = telco_df_area.groupBy('Service_Provider','Area') \
        .agg(sum('Billed_Amount').cast('decimal(38,2)') \
        .alias('Total_Billed_Amount')) \
        .orderBy(col('Service_Provider'))

print('\n Area-wise subscriber base for each provider: \n')
print(df_AggSales.show(50))

spark.createDataFrame(df_AggSales.take(500)).toPandas()


 Area-wise subscriber base for each provider: 

+----------------+------------+-------------------+
|Service_Provider|        Area|Total_Billed_Amount|
+----------------+------------+-------------------+
|          Airtel|Science City|           42933.25|
|          Airtel| PunjabiBagh|           80478.89|
|          Airtel|Bannerghatta|           28899.14|
|          Airtel|  Anna Nagar|           52215.92|
|          Airtel|        Juhu|           93119.37|
|            BSNL|        Juhu|           70269.01|
|            BSNL|Science City|           48516.23|
|            BSNL|Bannerghatta|           40187.83|
|            BSNL| PunjabiBagh|           41659.47|
|            BSNL|  Anna Nagar|           48801.99|
|            Idea| PunjabiBagh|           28143.61|
|            Idea|        Juhu|           49810.04|
|            Idea|Bannerghatta|           85546.73|
|            Idea|Science City|           45772.29|
|            Idea|  Anna Nagar|           57658.24|
|             J

Unnamed: 0,Service_Provider,Area,Total_Billed_Amount
0,Airtel,Science City,42933.25
1,Airtel,PunjabiBagh,80478.89
2,Airtel,Bannerghatta,28899.14
3,Airtel,Anna Nagar,52215.92
4,Airtel,Juhu,93119.37
5,BSNL,Juhu,70269.01
6,BSNL,Science City,48516.23
7,BSNL,Bannerghatta,40187.83
8,BSNL,PunjabiBagh,41659.47
9,BSNL,Anna Nagar,48801.99


In [332]:
#d.	Amounts that are due to the different telephone service providers, which may be starkly different from the market share
#calculate total Un-billed amount

telco_df_p_ub = telco_df.withColumn('UnBilled_Amount',round(col('Billed_Amount') - col('Paid_Amount'),2))
spark.createDataFrame(telco_df_p_ub.take(5)).toPandas()

#calculate total unbilled amoun
Total_UnBilled_Amount =  telco_df_p_ub.agg(round(sum("UnBilled_Amount"),2).alias('Total_UnBilled_Amount'))
var_total_unbilled_amount = Total_UnBilled_Amount.head()[0]

# Aggregate Unilled Amount by Service Provider
df_AggSales2 = telco_df_p_ub.groupBy('Service_Provider') \
        .agg(round(sum(col('UnBilled_Amount')/var_total_unbilled_amount*100),2).alias('Percentage_For_UnBilled_Amount'))


print('\n Unbilled Amount percentage :\n')
spark.createDataFrame(df_AggSales2.take(500)).toPandas()

Unnamed: 0,Month,Name,Address,Date_of_Birth,Service_Provider,Account_Type,Billed_Amount,Paid_Amount,UnBilled_Amount
0,July,YFNIQCDTQW,52Delhi,1989-09-24,Airtel,P,6155.97998,2701.23999,3454.73999
1,March,QOEORMGDAT,45Kolkata,1995-07-07,Idea,C,5539.410156,3110.409912,2429.0
2,October,XEOFGPIYZC,80Chennai,1957-08-23,Vodafone,P,2844.709961,2148.030029,696.679993
3,November,MKHAZJDSLP,10Delhi,1994-12-15,Jio,C,5254.450195,5187.470215,66.980003
4,November,AVJUQBHRZY,19Delhi,1980-07-28,Airtel,C,3776.090088,2146.340088,1629.75



 Unbilled Amount percentage :



Unnamed: 0,Service_Provider,Percentage_For_UnBilled_Amount
0,Vodafone,17.98
1,Idea,19.84
2,Airtel,25.21
3,Jio,13.1
4,BSNL,23.88


In [333]:

#calcualte Billed Amount percentage by market share
Total_UnBilled_Amount =  telco_df.agg(round(sum("Billed_Amount"),2).alias('Total_Billed_Amount'))

var_total_billed_amount = Total_UnBilled_Amount.head()[0]
var_total_billed_amount

# Aggregate billed Amount by Service Provider

df_AggSales3 = telco_df_p_ub.groupBy('Service_Provider') \
        .agg(round(sum(col('Billed_Amount')/var_total_billed_amount*100),2).alias('Percentage_For_Billed_Amount'))
#df_AggSales3.head(5)
print('\n Unbilled Amount percentage :\n')
spark.createDataFrame(df_AggSales3.take(500)).toPandas()

1161840.6


 Unbilled Amount percentage :



Unnamed: 0,Service_Provider,Percentage_For_Billed_Amount
0,Vodafone,17.13
1,Idea,22.97
2,Airtel,25.62
3,Jio,12.8
4,BSNL,21.47


In [334]:
#Join Billed and Unbilled amounts DF to present the comparison

result = pd.concat([spark.createDataFrame(df_AggSales2.take(100)).toPandas(), spark.createDataFrame(df_AggSales3.take(100)).toPandas()], axis=1, join="inner" )

print('\n Billed to Unbilled comparison by Service provider \n')
result



 Billed to Unbilled comparison by Service provider 



Unnamed: 0,Service_Provider,Percentage_For_UnBilled_Amount,Service_Provider.1,Percentage_For_Billed_Amount
0,Vodafone,17.98,Vodafone,17.13
1,Idea,19.84,Idea,22.97
2,Airtel,25.21,Airtel,25.62
3,Jio,13.1,Jio,12.8
4,BSNL,23.88,BSNL,21.47


Conclusion: Unbilled amounts are promotional to the total market share by the Serice providers, Values are quite close to each other. In case of Idea, BSNL the unbilled market share is high compared to total billed Amount.




In [349]:
#e.	Counts of subscriber accounts by corporate or personal, aggregated by the service provider

#Taking copy of the master data

telco_df_AccountType = telco_df
df_AccountTypeBySubs = telco_df_AccountType.groupBy('Service_Provider','Account_Type') \
        .count() \
        .orderBy(col('Service_Provider'))

print('\n Counts of subscriber accounts by corporate or personal, aggregated by the service provider: \n')
#print(df_AccountTypeBySubs.show(50))
spark.createDataFrame(df_AccountTypeBySubs.take(100)).toPandas()



 Counts of subscriber accounts by corporate or personal, aggregated by the service provider: 



Unnamed: 0,Service_Provider,Account_Type,count
0,Airtel,C,23
1,Airtel,P,30
2,BSNL,P,20
3,BSNL,C,24
4,Idea,P,13
5,Idea,C,27
6,Jio,C,19
7,Jio,P,10
8,Vodafone,C,11
9,Vodafone,P,23
