In [11]:
import string
import pyspark
from IPython.display import clear_output, display
from pyspark.sql import SparkSession
from pyspark.sql.functions import (
    avg,
    col,
    max,
    min,
    udf,
)

In [12]:
spark = SparkSession.builder.appName("TitanicData").getOrCreate()
# Read the CSV file into a DataFrame
df_pyspark = spark.read.option("header", "true").option("inferSchema", "true").option("header", "false").csv("data/titanic.csv")
# Define the column names of the Titanic dataset
column_names = ['PassengerId', 'Survived', 'Pclass', 'Name', 'Sex', 'Age', 'SibSp', 'Parch', 'Ticket', 'Fare', 'Cabin', 'Embarked','Timestamp']

In [13]:
# Assign the column names to the DataFrame
df_pyspark = df_pyspark.toDF(*column_names)
# Display the DataFrame schema
df_pyspark.printSchema()

root
 |-- PassengerId: integer (nullable = true)
 |-- Survived: integer (nullable = true)
 |-- Pclass: integer (nullable = true)
 |-- Name: string (nullable = true)
 |-- Sex: string (nullable = true)
 |-- Age: integer (nullable = true)
 |-- SibSp: integer (nullable = true)
 |-- Parch: integer (nullable = true)
 |-- Ticket: string (nullable = true)
 |-- Fare: double (nullable = true)
 |-- Cabin: string (nullable = true)
 |-- Embarked: string (nullable = true)
 |-- Timestamp: timestamp (nullable = true)



In [14]:
# Print the column names
df_pyspark.columns

['PassengerId',
 'Survived',
 'Pclass',
 'Name',
 'Sex',
 'Age',
 'SibSp',
 'Parch',
 'Ticket',
 'Fare',
 'Cabin',
 'Embarked',
 'Timestamp']

In [15]:
df_pyspark.select("Name").show()

+--------------------+
|                Name|
+--------------------+
|Braund, Mr. Owen ...|
|Cumings, Mrs. Joh...|
|Heikkinen, Miss. ...|
|Futrelle, Mrs. Ja...|
|Allen, Mr. Willia...|
|    Moran, Mr. James|
|McCarthy, Mr. Tim...|
|Palsson, Master. ...|
|Johnson, Mrs. Osc...|
|Nasser, Mrs. Nich...|
|Sandstrom, Miss. ...|
|Bonnell, Miss. El...|
|Saundercock, Mr. ...|
|Andersson, Mr. An...|
|Vestrom, Miss. Hu...|
|Hewlett, Mrs. (Ma...|
|Rice, Master. Eugene|
|Williams, Mr. Cha...|
|Vander Planke, Mr...|
|Masselmani, Mrs. ...|
+--------------------+
only showing top 20 rows



In [16]:
num_cols = [col for col, dtype in df_pyspark.dtypes if dtype in ['int', 'double']]
num_stats = df_pyspark.select(num_cols).describe().drop("summary")
num_stats.show()

+-----------------+-------------------+------------------+------------------+------------------+-------------------+-----------------+
|      PassengerId|           Survived|            Pclass|               Age|             SibSp|              Parch|             Fare|
+-----------------+-------------------+------------------+------------------+------------------+-------------------+-----------------+
|              891|                891|               891|               714|               891|                891|              891|
|            446.0| 0.3838383838383838| 2.308641975308642|29.679271708683473|0.5230078563411896|0.38159371492704824| 32.2042079685746|
|257.3538420152301|0.48659245426485753|0.8360712409770491|14.536482769437564|1.1027434322934315| 0.8060572211299488|49.69342859718089|
|                1|                  0|                 1|                 0|                 0|                  0|              0.0|
|              891|                  1|                

In [17]:
numeric_columns = df_pyspark.select("PassengerId", "Survived", "Pclass", "Age", "Fare", "SibSp", "Parch", "Embarked").columns
min_values = df_pyspark.select(numeric_columns).agg(*(min(c).alias(c) for c in numeric_columns)).collect()[0]
max_values = df_pyspark.select(numeric_columns).agg(*(max(c).alias(c) for c in numeric_columns)).collect()[0]
avg_values = df_pyspark.select(numeric_columns).agg(*(avg(c).alias(c) for c in numeric_columns)).collect()[0]
print("Minimum values:")
print(min_values)
print("Maximum values:")
print(max_values)
print("Average values:")
print(avg_values)

Minimum values:
Row(PassengerId=1, Survived=0, Pclass=1, Age=0, Fare=0.0, SibSp=0, Parch=0, Embarked='C')
Maximum values:
Row(PassengerId=891, Survived=1, Pclass=3, Age=80, Fare=512.3292, SibSp=8, Parch=6, Embarked='S')
Average values:
Row(PassengerId=446.0, Survived=0.3838383838383838, Pclass=2.308641975308642, Age=29.679271708683473, Fare=32.2042079685746, SibSp=0.5230078563411896, Parch=0.38159371492704824, Embarked=None)


In [18]:
def change_last_letter(x: str) -> StringType:
  words = x.split()
  for i in range(len(words)):
    words[i] = words[i][:-1] + "1"
  return " ".join(words)

In [19]:
# Create a UDF from the function
udf_change_last_letter = udf(change_last_letter, StringType())
# Apply the UDF to the "Name" column
df = df_pyspark.withColumn("Name", udf_change_last_letter(col("Name")))
# Print the results
df.head()

Row(PassengerId=1, Survived=0, Pclass=3, Name='Braund1 Mr1 Owe1 Harri1', Sex='male', Age=22, SibSp=1, Parch=0, Ticket='A/5 21171', Fare=7.25, Cabin=None, Embarked='S', Timestamp=datetime.datetime(2020, 1, 1, 13, 45, 25))

In [22]:
# Sort DataFrame by the first column
sorted_df = df.orderBy(df.columns[0])
# Save the sorted DataFrame to Parquet file
sorted_df.write.parquet("./sorted_titanic/titanic.parquet")