In [33]:
import pyspark
import string
from IPython.display import clear_output, display
from pyspark.sql import SparkSession
from pyspark.sql.functions import (
    max,
    min,
    udf,
    avg,
    col
)
spark = SparkSession.builder.appName("Assignment").getOrCreate()

In [25]:
updated_df = spark.read.option("header", "true").option("inferSchema", "true").option("header", "false").csv("titanic.csv")
column_names = ['PassengerId', 'Survived', 'Pclass', 'Name', 'Sex', 'Age', 'SibSp','Parch','Ticket','Fare','Cabin','Embarked','Timestamp']

In [26]:
updated_df = updated_df.toDF(*column_names)
updated_df.printSchema()

root
 |-- PassengerId: integer (nullable = true)
 |-- Survived: integer (nullable = true)
 |-- Pclass: integer (nullable = true)
 |-- Name: string (nullable = true)
 |-- Sex: string (nullable = true)
 |-- Age: integer (nullable = true)
 |-- SibSp: integer (nullable = true)
 |-- Parch: integer (nullable = true)
 |-- Ticket: string (nullable = true)
 |-- Fare: double (nullable = true)
 |-- Cabin: string (nullable = true)
 |-- Embarked: string (nullable = true)
 |-- Timestamp: timestamp (nullable = true)



In [27]:
updated_df.schema.names

['PassengerId',
 'Survived',
 'Pclass',
 'Name',
 'Sex',
 'Age',
 'SibSp',
 'Parch',
 'Ticket',
 'Fare',
 'Cabin',
 'Embarked',
 'Timestamp']

## For numerical columns, calculate minimum, maximum and average values.

In [28]:
updated_df.printSchema()

root
 |-- PassengerId: integer (nullable = true)
 |-- Survived: integer (nullable = true)
 |-- Pclass: integer (nullable = true)
 |-- Name: string (nullable = true)
 |-- Sex: string (nullable = true)
 |-- Age: integer (nullable = true)
 |-- SibSp: integer (nullable = true)
 |-- Parch: integer (nullable = true)
 |-- Ticket: string (nullable = true)
 |-- Fare: double (nullable = true)
 |-- Cabin: string (nullable = true)
 |-- Embarked: string (nullable = true)
 |-- Timestamp: timestamp (nullable = true)



In [29]:
from pyspark.sql.functions import col

numerical_columns = [col_name for col_name, col_type in updated_df.dtypes if col_type in ["int", "double"]]

statistics = {}
for column in numerical_columns:
    column_stats = updated_df.select(col(column)).describe().collect()
    minimum = float(column_stats[3][column]) if column_stats[3][column] is not None else None
    maximum = float(column_stats[4][column]) if column_stats[4][column] is not None else None
    average = float(column_stats[1][column]) if column_stats[1][column] is not None else None
    stats = {
        "minimum": minimum,
        "maximum": maximum,
        "average": average
    }
    statistics[column] = stats

# Print the calculated statistics
for column, stats in statistics.items():
    print(f"Column '{column}':")
    print(f"  Minimum: {stats['minimum']}")
    print(f"  Maximum: {stats['maximum']}")
    print(f"  Average: {stats['average']}")
    print()


Column 'PassengerId':
  Minimum: 1.0
  Maximum: 891.0
  Average: 446.0

Column 'Survived':
  Minimum: 0.0
  Maximum: 1.0
  Average: 0.3838383838383838

Column 'Pclass':
  Minimum: 1.0
  Maximum: 3.0
  Average: 2.308641975308642

Column 'Age':
  Minimum: 0.0
  Maximum: 80.0
  Average: 29.679271708683473

Column 'SibSp':
  Minimum: 0.0
  Maximum: 8.0
  Average: 0.5230078563411896

Column 'Parch':
  Minimum: 0.0
  Maximum: 6.0
  Average: 0.38159371492704824

Column 'Fare':
  Minimum: 0.0
  Maximum: 512.3292
  Average: 32.2042079685746



## For categorical columns, create and apply UDF that will change the last letter of every word to “1”.

In [30]:
from pyspark.sql.types import StringType

def change_last_letter(x: str) -> StringType:
    words = x.split()
    for i in range(len(words)):
        words[i] = words[i][:-1] + "1"
        return " ".join(words)
udf_change_last_letter = udf(change_last_letter, StringType())
df = updated_df.withColumn("Sex", udf_change_last_letter(col("Sex")))
df.head()

Row(PassengerId=1, Survived=0, Pclass=3, Name='Braund, Mr. Owen Harris', Sex='mal1', Age=22, SibSp=1, Parch=0, Ticket='A/5 21171', Fare=7.25, Cabin=None, Embarked='S', Timestamp=datetime.datetime(2020, 1, 1, 13, 45, 25))

## Sort DataFrame by the first column and save the results to the Parquet file.

In [32]:
sorted_df = df.orderBy(df.columns[0])
sorted_df.write.parquet("titanic_output_data.parquet")