#Step 01. Install All Dependencies

This installs Apache Spark 2.3.3, Java 8, Findspark library that makes it easy for Python to work on the given Big Data.

In [1]:
import os
#OpenJDK Dependencies for Spark
os.system('apt-get install openjdk-8-jdk-headless -qq > /dev/null')

#Download Apache Spark
os.system('wget -q http://apache.osuosl.org/spark/spark-2.3.3/spark-2.3.3-bin-hadoop2.7.tgz') 

#Apache Spark and Hadoop Unzip
os.system('tar xf spark-2.3.3-bin-hadoop2.7.tgz')

#FindSpark Install
os.system('pip install -q findspark')

0

# Step 02. Set Environment Variables
Set the locations where Spark and Java are installed based on your installation configuration. Double check before you proceed.

In [0]:
import os
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-8-openjdk-amd64"
os.environ["SPARK_HOME"] = "/content/spark-2.3.3-bin-hadoop2.7"

# Step 03. ELT - Load the Data: Mega Cloud Access
This is an alternative approach to load datasets from already stored in [**Mega Cloud**](https://mega.nz) cloud repository. You need to install the necessary packages and put the link URL of cloud to load the file from cloud directly.

In [3]:
import os
os.system('git clone https://github.com/jeroenmeulenaar/python3-mega.git')
os.chdir('python3-mega')
os.system('pip install -r requirements.txt')

0

# Step 04. ELT - Load the Data: Read Uploaded Dataset
In this approach you can directly load the uploaded dataset downloaded fro Mega Cloud Infrastructure

In [0]:
from mega import Mega
os.chdir('../')
m_prepaid = Mega.from_ephemeral()
m_prepaid.download_from_url('https://mega.nz/#!kwRG2YyC!8TU2aoENtn98M9FHvGEjbk4iIsxBDBERw_H_wFzUchA')

In [0]:
m_postpaid = Mega.from_ephemeral()
m_postpaid.download_from_url('https://mega.nz/#!R0RWwAKA!4HK8qDfIxCFo40mS7hnYjo5XiBYmGhUcB23rN5uSQQk')

In [0]:
m_crm = Mega.from_ephemeral()
m_crm.download_from_url('https://mega.nz/#!soBChSAY!gnyBRDrglAgkQffIAOqHDGGoOViYywT6eXc2DYZlz5E')

# Step 05. Start a Spark Session
This basic code will prepare to start a Spark session.

In [0]:
import findspark
findspark.init()
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName('ml-datathon19-easy03').master("local[*]").getOrCreate()

# Step 06. Data Cleaning - Phase 01 Data Schema View
Now let's load the DataFrame and see the schema view of the Spark dataset and start cleaning our datasets

In [8]:
df_crm = spark.read.csv('crm.csv', header = False, inferSchema = True)
df_crm.dtypes

[('_c0', 'string'),
 ('_c1', 'string'),
 ('_c2', 'int'),
 ('_c3', 'string'),
 ('_c4', 'string'),
 ('_c5', 'string')]

# Step 07. Data Cleaning - Phase 02 Header Update
Now let's load the DataFrame and we can see that there is no header for this dataset. So we need to update the header columns and verify 

In [0]:
df_crm =df_crm.withColumnRenamed('_c0', 'msisdn')

In [0]:
df_crm =df_crm.withColumnRenamed('_c1', 'gender')

In [0]:
df_crm =df_crm.withColumnRenamed('_c2', 'year_of_birth')

In [0]:
df_crm =df_crm.withColumnRenamed('_c3', 'system_status')

In [0]:
df_crm =df_crm.withColumnRenamed('_c4', 'mobile_type')

In [0]:
df_crm =df_crm.withColumnRenamed('_c5', 'value_segment')

In [15]:
df_crm.head()

Row(msisdn='aeef4233d9ad34e41f7ecf48d64646f8', gender='MALE', year_of_birth=1985, system_status='ACTIVE', mobile_type='Prepaid', value_segment='Tier_3')

# Step 08. Data Cleaning - Phase 03 Row Count and Unique Row Count
We need to know the number of rows. We'll grab total number of entries to have an overview of all data and grab total number of unique entries or unique row count of the Spark dataset to have an overview of duplicate data


In [16]:
print("Total Rows: ")
df_crm.count()

Total Rows: 


97372872

In [17]:
print("Unique Rows: ")
df_crm.distinct().count()

Unique Rows: 


97355992

# Step 09. Data Cleaning - Phase 04 Removing duplicate rows

Roughly `0.017%` data are `DUPLICATE` values from the table. Since this is neglitible compare to the original row count, we will now filter the dataset to remove all `DUPLICATE` values

In [0]:
df_crm = df_crm.dropDuplicates()

# Step 10. Data Cleaning - Phase 05 Review NULL values in each column
Since the total row count and unique row count are now same, it means there is no duplicate rows in the table. Now we'll grab the count of NULL values per column to check whether any missing values is there or not.

In [19]:
import pyspark.sql.functions as F
df_agg = df_crm.agg(*[F.count(F.when(F.isnull(c), c)).alias(c) for c in df_crm.columns])
df_agg.show()

+------+--------+-------------+-------------+-----------+-------------+
|msisdn|  gender|year_of_birth|system_status|mobile_type|value_segment|
+------+--------+-------------+-------------+-----------+-------------+
|     0|18963284|      3206817|            1|          1|      8173367|
+------+--------+-------------+-------------+-----------+-------------+



# Step 11. Data Cleaning - Advanced Data Filling
Roughly `19.5%` `gender` data, `3%` `year_of_birth` data and `8.5%` `value_segment` data are `NULL` values from the table. Since this is a large chunk of data, we will now do some advanced techniques to fill the required columns so that we can do our operations fruitfully

In [20]:
df_crm.groupBy("year_of_birth").count().sort("count", ascending=False).show() 

+-------------+-------+
|year_of_birth|  count|
+-------------+-------+
|         1988|4722270|
|         1982|4382183|
|         1987|4342609|
|         1989|4210311|
|         1985|4157539|
|         1983|3476667|
|         1980|3441825|
|         1977|3369041|
|         1986|3286140|
|         null|3206817|
|         1984|3116875|
|         1994|3021323|
|         1990|3003995|
|         1972|2946266|
|         1991|2821666|
|         1993|2760131|
|         1992|2744080|
|         1981|2640796|
|         1979|2572952|
|         1978|2531215|
+-------------+-------+
only showing top 20 rows



In [21]:
df_crm_year_10 = df_crm.filter(df_crm["year_of_birth"]<2001)
df_crm_year_10_75 = df_crm_year_10.filter(df_crm_year_10["year_of_birth"]>1944)
df_crm_year_10_75.groupBy("year_of_birth").count().sort("year_of_birth", ascending=False).show() 

+-------------+-------+
|year_of_birth|  count|
+-------------+-------+
|         2000|   7333|
|         1999| 141815|
|         1998| 433512|
|         1997| 897077|
|         1996| 980577|
|         1995|1165529|
|         1994|3021323|
|         1993|2760131|
|         1992|2744080|
|         1991|2821666|
|         1990|3003995|
|         1989|4210311|
|         1988|4722270|
|         1987|4342609|
|         1986|3286140|
|         1985|4157539|
|         1984|3116875|
|         1983|3476667|
|         1982|4382183|
|         1981|2640796|
+-------------+-------+
only showing top 20 rows



In [22]:
df_crm_year_10_75.groupBy("gender").count().sort("count", ascending=False).show() 

+-------------+--------+
|       gender|   count|
+-------------+--------+
|         Male|61262532|
|         null|18725584|
|       Female| 7472728|
|         MALE| 3807337|
|Not Available| 1116998|
|       FEMALE| 1047494|
|            M|   44518|
|         male|   10238|
|       female|    3029|
|            U|    2664|
|            F|    1508|
|        Other|    1293|
|            .|     642|
|          N/A|     623|
|            3|     244|
|      Female.|     212|
|            ]|     129|
|            `|     107|
|            +|      60|
|            B|      59|
+-------------+--------+
only showing top 20 rows



#Test From Here

In [0]:
df_crm_year_10_75_x = df_crm_year_10_75.replace(['M', 'm', 'MALE', 'male','Male.'], ['Male', 'Male', 'Male', 'Male', 'Male'], 'gender')
df_crm_year_10_75_xy = df_crm_year_10_75_x.replace(['F', 'f', 'FEMALE', 'female', 'Female.', 'Female]', 'FEMELE', 'FE', 'Not Available', 'Female3'], ['Female', 'Female', 'Female', 'Female', 'Female', 'Female', 'Female', 'Female', 'Female', 'Female'], 'gender')

In [35]:
df_crm_year_10_75_xy = df_crm_year_10_75_xy.fillna({'gender':'Female'})
df_crm_year_10_75_xy.groupBy("gender").count().sort("count", ascending=False).show()

+-------+--------+
| gender|   count|
+-------+--------+
|   Male|65124655|
| Female|28367685|
|      U|    2664|
|  Other|    1293|
|      .|     642|
|    N/A|     623|
|      3|     244|
|      ]|     129|
|      `|     107|
|      +|      60|
|      B|      59|
|      0|      46|
|      ~|      33|
|     \\|      30|
|      D|      28|
|      S|      25|
|      P|      20|
|Female`|      17|
|FemaleF|      13|
|FemaleH|      12|
+-------+--------+
only showing top 20 rows



In [36]:
df_prepaid = spark.read.csv('prepaid.csv', header = True, inferSchema = True)
df_prepaid.dtypes

[('imei_tac', 'string'),
 ('msisdn', 'string'),
 ('years', 'int'),
 ('months', 'int'),
 ('total_volume_mb', 'int'),
 ('bundle_volume_mb', 'int'),
 ('pay_per_use_volume', 'int'),
 ('free_volume_mb', 'int')]

In [37]:
df_postpaid = spark.read.csv('postpaid.csv', header = True, inferSchema = True)
df_postpaid.dtypes

[('imei_tac', 'string'),
 ('msisdn', 'string'),
 ('years', 'int'),
 ('months', 'int'),
 ('total_volume_mb', 'int'),
 ('bundle_volume_mb', 'int'),
 ('pay_per_use_volume', 'int'),
 ('free_volume_mb', 'int')]

In [0]:
ta = df_crm_year_10_75_xy.alias('ta')
tb = df_prepaid.alias('tb')
tc = df_postpaid.alias('tc')
inner_join_pre = ta.join(tb, ta.msisdn == tb.msisdn)
inner_join_post = ta.join(tc, ta.msisdn == tc.msisdn)

In [41]:
from pyspark.sql.functions import desc
Easy02 = inner_join_pre.groupBy('gender').agg({'total_volume_mb':'sum'}).sort(desc("sum(total_volume_mb)"))
Easy02 = Easy02.withColumnRenamed("sum(total_volume_mb)", "Total Data Consumed (Prepaid)")
Easy02 = Easy02.withColumnRenamed("gender", "Gender")
Easy02.show(n=2, truncate=False)

+------+-----------------------------+
|Gender|Total Data Consumed (Prepaid)|
+------+-----------------------------+
|Male  |107881846224                 |
|Female|46529628579                  |
+------+-----------------------------+
only showing top 2 rows



In [40]:
from pyspark.sql.functions import desc
Easy03 = inner_join_post.groupBy('gender').agg({'total_volume_mb':'sum'}).sort(desc("sum(total_volume_mb)"))
Easy03 = Easy03.withColumnRenamed("sum(total_volume_mb)", "Total Data Consumed (Postpaid)")
Easy03 = Easy03.withColumnRenamed("gender", "Gender")
Easy03.show(n=2, truncate=False)

+------+------------------------------+
|Gender|Total Data Consumed (Postpaid)|
+------+------------------------------+
|Male  |6804560154                    |
|Female|2639851004                    |
+------+------------------------------+
only showing top 2 rows



#Stop Here

# Step 06. Exploration - Data Schema View
Now let's load the DataFrame and see the schema view of the Spark dataset

# Step 07. Exploration - Row Count
Now since all the rows are here string/text formatted there is no meaning of running statistical method over the values of these columns. But we need to know the number of rows. We'll grab total number of entries to have an overview of data

# Step 08. Exploration - Total Unique Row Count
Now we'll grab total number of unique entries or unique row count of the Spark dataset to have an overview of duplicate data

# Step 09. Exploration - Reviewing the NULL values in each column
Since the total row count and unique row count are same, it means there is no duplicate rows in the table. Now we'll grab the count of NULL values per column to check whether any missing values is there or not.

# Step 10. Exploration - Filtering the NULL values rows of Model entries
Roughly `1.25%` data are `NULL` values from the table. Since this is neglitible compare to the original row count, we will now filter the dataset to remove all `NULL` values of `model_name` column.

# Step 11. Implementation - Run the SQL Command
Now since we got the idea that there is no NULL values and we optimises the dual SIM enabled mobile set rows, we can straightly go for executing SQL command to get the desired outcome. As a part of optimisation, we can drop of the column week_number as this is not relevant to this problem.