<a href="https://colab.research.google.com/github/poojaGgunagi/DE_Case_study_2024_Aug/blob/main/DE_Case_Study_Aviso_Aug_2024.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [4]:
pip install pyspark

Collecting pyspark
  Downloading pyspark-3.5.2.tar.gz (317.3 MB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m317.3/317.3 MB[0m [31m1.5 MB/s[0m eta [36m0:00:00[0m
[?25h  Preparing metadata (setup.py) ... [?25l[?25hdone
Building wheels for collected packages: pyspark
  Building wheel for pyspark (setup.py) ... [?25l[?25hdone
  Created wheel for pyspark: filename=pyspark-3.5.2-py2.py3-none-any.whl size=317812365 sha256=010b0302be59bebfa4f1b00cab9b7936f7555fa33fc1503244212cfa4c170621
  Stored in directory: /root/.cache/pip/wheels/34/34/bd/03944534c44b677cd5859f248090daa9fb27b3c8f8e5f49574
Successfully built pyspark
Installing collected packages: pyspark
Successfully installed pyspark-3.5.2


In [7]:
from pyspark.sql import SparkSession
from pyspark.sql.types import *
from pyspark.sql.functions import *

# **1. Create Spark Session**

In [12]:
spark=SparkSession.Builder().appName("usecase").getOrCreate()

# **2. Load the CSV File**

In [69]:
csv_df=spark.read.format("csv")\
                  .option("header","true")\
                  .option("inferSchema","true")\
                  .load("/content/Sample Input.csv")



In [70]:
csv_df.printSchema()

root
 |-- pclass: integer (nullable = true)
 |-- survived: integer (nullable = true)
 |-- name: string (nullable = true)
 |-- sex: string (nullable = true)
 |-- age: double (nullable = true)
 |-- sibsp: integer (nullable = true)
 |-- parch: integer (nullable = true)
 |-- ticket: string (nullable = true)
 |-- fare: double (nullable = true)
 |-- cabin: string (nullable = true)
 |-- embarked: string (nullable = true)
 |-- boat: string (nullable = true)
 |-- body: integer (nullable = true)
 |-- home: string (nullable = true)



In [71]:
csv_df.count()

1309

In [72]:
csv_df.show()

+------+--------+--------------------+------+------+-----+-----+--------+--------+-------+--------+----+----+--------------------+
|pclass|survived|                name|   sex|   age|sibsp|parch|  ticket|    fare|  cabin|embarked|boat|body|                home|
+------+--------+--------------------+------+------+-----+-----+--------+--------+-------+--------+----+----+--------------------+
|     1|       1|Allen, Miss. Elis...|female|  29.0|    0|    0|   24160|211.3375|     B5|       S|   2|NULL|        St Louis, MO|
|     1|       1|Allison, Master. ...|  male|0.9167|    1|    2|  113781|  151.55|C22 C26|       S|  11|NULL|Montreal, PQ / Ch...|
|     1|       0|Allison, Miss. He...|female|   2.0|    1|    2|  113781|  151.55|C22 C26|       S|NULL|NULL|Montreal, PQ / Ch...|
|     1|       0|Allison, Mr. Huds...|  male|  30.0|    1|    2|  113781|  151.55|C22 C26|       S|NULL| 135|Montreal, PQ / Ch...|
|     1|       0|Allison, Mrs. Hud...|female|  25.0|    1|    2|  113781|  151.55|C

#  **3. Calculate Missing**

In [73]:
find_missing_values = csv_df.select([(csv_df[c].isNull().cast("int")).alias(c) for c in csv_df.columns])
total_missing_values=find_missing_values.agg(*[sum(c).alias(c) for c in find_missing_values.columns])
total_missing_values.show()


+------+--------+----+---+---+-----+-----+------+----+-----+--------+----+----+----+
|pclass|survived|name|sex|age|sibsp|parch|ticket|fare|cabin|embarked|boat|body|home|
+------+--------+----+---+---+-----+-----+------+----+-----+--------+----+----+----+
|     0|       0|   0|  0|263|    0|    0|     0|   1| 1014|       2| 823|1188| 564|
+------+--------+----+---+---+-----+-----+------+----+-----+--------+----+----+----+



# **4. Data Cleaning**

In [67]:
print(csv_df.count())

1309


In [74]:
# Loop through each column and fill nulls based on the data type
for column in csv_df.columns:
    column_type = csv_df.schema[column].dataType

    if isinstance(column_type, IntegerType):
        csv_df = csv_df.fillna({column: 0})
    elif isinstance(column_type, DoubleType):
        csv_df = csv_df.fillna({column: 0.0})
    elif isinstance(column_type, StringType):
        csv_df = csv_df.fillna({column: 'NA'})

# Show the updated DataFrame
csv_df.show()


+------+--------+--------------------+------+------+-----+-----+--------+--------+-------+--------+----+----+--------------------+
|pclass|survived|                name|   sex|   age|sibsp|parch|  ticket|    fare|  cabin|embarked|boat|body|                home|
+------+--------+--------------------+------+------+-----+-----+--------+--------+-------+--------+----+----+--------------------+
|     1|       1|Allen, Miss. Elis...|female|  29.0|    0|    0|   24160|211.3375|     B5|       S|   2|   0|        St Louis, MO|
|     1|       1|Allison, Master. ...|  male|0.9167|    1|    2|  113781|  151.55|C22 C26|       S|  11|   0|Montreal, PQ / Ch...|
|     1|       0|Allison, Miss. He...|female|   2.0|    1|    2|  113781|  151.55|C22 C26|       S|  NA|   0|Montreal, PQ / Ch...|
|     1|       0|Allison, Mr. Huds...|  male|  30.0|    1|    2|  113781|  151.55|C22 C26|       S|  NA| 135|Montreal, PQ / Ch...|
|     1|       0|Allison, Mrs. Hud...|female|  25.0|    1|    2|  113781|  151.55|C

# **5. Categorize Columns**

  You need to categorize columns into the following types:



*   Categorical: Columns with a limited number of distinct values.
*   Discrete: Columns with numeric but distinct values, typically integers.
*   Continuous: Columns with numeric values that are not discrete.
*   Text: Columns with free-form text.








# **Heuristics for Categorization:**


*   Categorical: Columns with fewer than a certain number of unique values (e.10)
*   Discrete: Numeric columns with integer values and a moderate number of unique values.
*   Continuous: Numeric columns with a large range of values.

*   Text: Columns identified by the presence of non-numeric data.






In [75]:
def categorize_columns(df):
    categorical = []
    discrete = []
    continuous = []
    text = []

    for column in df.columns:
        unique_count = df.select(column).distinct().count()
        column_type = df.schema[column].dataType

        if isinstance(column_type, StringType):
            if unique_count < 20:  # Assuming 20 as a threshold for categorical
                categorical.append(column)
            else:
                text.append(column)
        elif isinstance(column_type, IntegerType):
            if unique_count < 20:
                discrete.append(column)
            else:
                continuous.append(column)
        elif isinstance(column_type, DoubleType):
            continuous.append(column)

    return categorical, discrete, continuous, text

categorical, discrete, continuous, text = categorize_columns(csv_df)


In [76]:
categorical

['sex', 'embarked']

In [77]:
discrete

['pclass', 'survived', 'sibsp', 'parch']

In [80]:
continuous

['age', 'fare', 'body']

In [81]:
text

['name', 'ticket', 'cabin', 'boat', 'home']

# **6. Extract Statistics Based on Categories**

**Categorical and Discrete Columns**

In [82]:
for column in categorical + discrete:
    csv_df.groupBy(column).count().show()


+------+-----+
|   sex|count|
+------+-----+
|female|  466|
|  male|  842|
|   2.3|    1|
+------+-----+

+--------+-----+
|embarked|count|
+--------+-----+
|       Q|  123|
|      NA|    2|
|       C|  270|
|       S|  914|
+--------+-----+

+------+-----+
|pclass|count|
+------+-----+
|     1|  323|
|     3|  709|
|     2|  277|
+------+-----+

+--------+-----+
|survived|count|
+--------+-----+
|       1|  500|
|       0|  809|
+--------+-----+

+-----+-----+
|sibsp|count|
+-----+-----+
|    1|  319|
|    3|   20|
|    5|    6|
|    4|   22|
|    8|    9|
|    2|   42|
|    0|  891|
+-----+-----+

+-----+-----+
|parch|count|
+-----+-----+
|    1|  170|
|    6|    2|
|    3|    8|
|    5|    6|
|    9|    2|
|    4|    6|
|    2|  113|
|    0| 1002|
+-----+-----+



**Continuous Columns**

For continuous columns, extract the minimum and maximum values and create a histogram with frequency counts.

In [83]:
import numpy as np

for column in continuous:
    stats = csv_df.agg(min(column).alias('min'), max(column).alias('max')).collect()[0]
    min_val, max_val = stats['min'], stats['max']
    print(f"{column}: Min = {min_val}, Max = {max_val}")

    # Convert np.linspace output to a list
    bins = np.linspace(min_val, max_val, num=10).tolist()  # 10 bins
    histogram = csv_df.select(column).rdd.flatMap(lambda x: x).histogram(bins)
    print(f"Histogram for {column}: {histogram}")


age: Min = 0.0, Max = 80.0
Histogram for age: ([0.0, 8.88888888888889, 17.77777777777778, 26.666666666666668, 35.55555555555556, 44.44444444444444, 53.333333333333336, 62.22222222222223, 71.11111111111111, 80.0], [335, 82, 320, 250, 146, 99, 55, 19, 3])
fare: Min = 0.0, Max = 512.3292
Histogram for fare: ([0.0, 56.925466666666665, 113.85093333333333, 170.7764, 227.70186666666666, 284.62733333333335, 341.5528, 398.4782666666666, 455.4037333333333, 512.3292], [1111, 127, 33, 18, 16, 0, 0, 0, 4])
body: Min = 0, Max = 328
Histogram for body: ([0.0, 36.44444444444444, 72.88888888888889, 109.33333333333333, 145.77777777777777, 182.22222222222223, 218.66666666666666, 255.1111111111111, 291.55555555555554, 328.0], [1201, 18, 12, 13, 16, 11, 7, 14, 17])


 **Text Columns**

For text columns, extract word count information.

In [84]:
from pyspark.sql.functions import split, explode

for column in text:
    csv_df.select(explode(split(col(column), " ")).alias("word")).groupBy("word").count().show()


+----------+-----+
|      word|count|
+----------+-----+
|McDougald)|    1|
|  "Morley,|    1|
| Navratil,|    2|
|         K|    1|
|   Susanna|    1|
|    Force)|    1|
|   Thorpe)|    1|
|    Andree|    1|
|  Giuseppe|    1|
|    Foley,|    2|
| Harknett,|    1|
|   "Najib,|    1|
|   Torber,|    1|
|  Thornton|    3|
|  Andrews)|    1|
|Perreault,|    1|
| Sagesser,|    1|
|       der|    1|
|  "Harper,|    1|
|     Haas,|    1|
+----------+-----+
only showing top 20 rows

+------+-----+
|  word|count|
+------+-----+
|244270|    1|
|367230|    2|
| 18509|    1|
| 17757|    5|
|S.O.P.|    1|
| 14879|    7|
|363291|    3|
|363611|    1|
| 12750|    2|
| 24580|    1|
| 26360|    3|
| 31418|    1|
|  2696|    1|
|  2700|    1|
|    PC|   92|
| 14313|    1|
| 28034|    1|
|    SC|    2|
|237734|    1|
|  3902|    1|
+------+-----+
only showing top 20 rows

+----+-----+
|word|count|
+----+-----+
| A23|    1|
| B79|    1|
|  C6|    2|
| E44|    2|
| C22|    4|
|  A9|    1|
| D28|    2|
| 

**6. Output the Results as JSON**

Finally, convert the results into a JSON format and save the output.

In [85]:
import json

result = {
    "missing_values": missing_values.collect()[0].asDict(),
    "categorical": {col: csv_df.groupBy(col).count().collect() for col in categorical},
    "discrete": {col: csv_df.groupBy(col).count().collect() for col in discrete},
    "continuous": {col: {"min": min_val, "max": max_val, "histogram": histogram} for col in continuous},
    "text": {col: csv_df.select(explode(split(col, " ")).alias("word")).groupBy("word").count().collect() for col in text}
}

with open("output.json", "w") as f:
    json.dump(result, f)


In [86]:
spark.read.json("/content/output.json").show(truncate=False)

+---------------------------------------------------------------------------------+------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+---