<a href="https://colab.research.google.com/github/vamshi25p/Artificial-Intelligence-Codes/blob/main/map-reduce.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [1]:
!pip install pyspark




In [2]:
from pyspark.sql import SparkSession

# Initialize Spark session
spark = SparkSession.builder.appName("TitanicAnalysis").getOrCreate()

# Load dataset
titanic_path = "/titanic.csv"
titanic_df = spark.read.csv(titanic_path, header=True, inferSchema=True)

# Display schema and first few rows
titanic_df.printSchema()
titanic_df.show(5)


root
 |-- Survived: integer (nullable = true)
 |-- Pclass: integer (nullable = true)
 |-- Name: string (nullable = true)
 |-- Sex: string (nullable = true)
 |-- Age: double (nullable = true)
 |-- Siblings/Spouses Aboard: integer (nullable = true)
 |-- Parents/Children Aboard: integer (nullable = true)
 |-- Fare: double (nullable = true)

+--------+------+--------------------+------+----+-----------------------+-----------------------+-------+
|Survived|Pclass|                Name|   Sex| Age|Siblings/Spouses Aboard|Parents/Children Aboard|   Fare|
+--------+------+--------------------+------+----+-----------------------+-----------------------+-------+
|       0|     3|Mr. Owen Harris B...|  male|22.0|                      1|                      0|   7.25|
|       1|     1|Mrs. John Bradley...|female|38.0|                      1|                      0|71.2833|
|       1|     3|Miss. Laina Heikk...|female|26.0|                      0|                      0|  7.925|
|       1|     1|M

In [5]:
# Print the schema of the DataFrame to check the column names
titanic_df.printSchema()


root
 |-- Survived: integer (nullable = true)
 |-- Pclass: integer (nullable = true)
 |-- Name: string (nullable = true)
 |-- Sex: string (nullable = true)
 |-- Age: double (nullable = true)
 |-- Siblings/Spouses Aboard: integer (nullable = true)
 |-- Parents/Children Aboard: integer (nullable = true)
 |-- Fare: double (nullable = true)



In [6]:
# Create an index column (like PassengerId) using zipWithIndex
titanic_rdd_with_index = titanic_df.rdd.zipWithIndex()

# Now, you can perform a map-side join with the index
map_side_join_rdd = titanic_rdd_with_index.map(
    lambda row: (row[1], row[0].asDict() | {"AvgFare": broadcast_fares.value.get(row[0]["Pclass"])})
)

# Show some results
print(map_side_join_rdd.take(5))


[(0, {'Survived': 0, 'Pclass': 3, 'Name': 'Mr. Owen Harris Braund', 'Sex': 'male', 'Age': 22.0, 'Siblings/Spouses Aboard': 1, 'Parents/Children Aboard': 0, 'Fare': 7.25, 'AvgFare': 13.707707392197129}), (1, {'Survived': 1, 'Pclass': 1, 'Name': 'Mrs. John Bradley (Florence Briggs Thayer) Cumings', 'Sex': 'female', 'Age': 38.0, 'Siblings/Spouses Aboard': 1, 'Parents/Children Aboard': 0, 'Fare': 71.2833, 'AvgFare': 84.15468749999992}), (2, {'Survived': 1, 'Pclass': 3, 'Name': 'Miss. Laina Heikkinen', 'Sex': 'female', 'Age': 26.0, 'Siblings/Spouses Aboard': 0, 'Parents/Children Aboard': 0, 'Fare': 7.925, 'AvgFare': 13.707707392197129}), (3, {'Survived': 1, 'Pclass': 1, 'Name': 'Mrs. Jacques Heath (Lily May Peel) Futrelle', 'Sex': 'female', 'Age': 35.0, 'Siblings/Spouses Aboard': 1, 'Parents/Children Aboard': 0, 'Fare': 53.1, 'AvgFare': 84.15468749999992}), (4, {'Survived': 0, 'Pclass': 3, 'Name': 'Mr. William Henry Allen', 'Sex': 'male', 'Age': 35.0, 'Siblings/Spouses Aboard': 0, 'Parents/

In [9]:
# Step 1: Convert the broadcasted fares dictionary into an RDD
broadcast_fares_rdd = spark.sparkContext.parallelize(broadcast_fares.value.items())

# Step 2: Perform the join between the Titanic RDD and the broadcasted fares RDD
reduced_join_rdd = join_rdd.join(broadcast_fares_rdd)

# Show the result of the reduce-side join
print(reduced_join_rdd.take(5))


[(3, (Row(Survived=0, Pclass=3, Name='Mr. Owen Harris Braund', Sex='male', Age=22.0, Siblings/Spouses Aboard=1, Parents/Children Aboard=0, Fare=7.25), 13.707707392197129)), (3, (Row(Survived=1, Pclass=3, Name='Miss. Laina Heikkinen', Sex='female', Age=26.0, Siblings/Spouses Aboard=0, Parents/Children Aboard=0, Fare=7.925), 13.707707392197129)), (3, (Row(Survived=0, Pclass=3, Name='Mr. William Henry Allen', Sex='male', Age=35.0, Siblings/Spouses Aboard=0, Parents/Children Aboard=0, Fare=8.05), 13.707707392197129)), (3, (Row(Survived=0, Pclass=3, Name='Mr. James Moran', Sex='male', Age=27.0, Siblings/Spouses Aboard=0, Parents/Children Aboard=0, Fare=8.4583), 13.707707392197129)), (3, (Row(Survived=0, Pclass=3, Name='Master. Gosta Leonard Palsson', Sex='male', Age=2.0, Siblings/Spouses Aboard=3, Parents/Children Aboard=1, Fare=21.075), 13.707707392197129))]


In [11]:
# Word count task: Count the frequency of names in the dataset
name_rdd = titanic_df.rdd.map(lambda row: row["Name"]) \
                         .flatMap(lambda name: name.split()) \
                         .map(lambda word: (word.lower(), 1)) \
                         .reduceByKey(lambda a, b: a + b)

# Show the result of word count
print(name_rdd.take(5))


[('mr.', 513), ('owen', 2), ('harris', 5), ('braund', 2), ('mrs.', 125)]


In [12]:
# Filter passengers who survived
survived_rdd = titanic_df.rdd.filter(lambda row: row["Survived"] == 1)

# Show some results of survivors
print(survived_rdd.take(5))


[Row(Survived=1, Pclass=1, Name='Mrs. John Bradley (Florence Briggs Thayer) Cumings', Sex='female', Age=38.0, Siblings/Spouses Aboard=1, Parents/Children Aboard=0, Fare=71.2833), Row(Survived=1, Pclass=3, Name='Miss. Laina Heikkinen', Sex='female', Age=26.0, Siblings/Spouses Aboard=0, Parents/Children Aboard=0, Fare=7.925), Row(Survived=1, Pclass=1, Name='Mrs. Jacques Heath (Lily May Peel) Futrelle', Sex='female', Age=35.0, Siblings/Spouses Aboard=1, Parents/Children Aboard=0, Fare=53.1), Row(Survived=1, Pclass=3, Name='Mrs. Oscar W (Elisabeth Vilhelmina Berg) Johnson', Sex='female', Age=27.0, Siblings/Spouses Aboard=0, Parents/Children Aboard=2, Fare=11.1333), Row(Survived=1, Pclass=2, Name='Mrs. Nicholas (Adele Achem) Nasser', Sex='female', Age=14.0, Siblings/Spouses Aboard=1, Parents/Children Aboard=0, Fare=30.0708)]


In [13]:
# Save the result of the map-side join to a text file
map_side_join_rdd.saveAsTextFile("/content/titanic_map_side_join_result.txt")


In [15]:
reduced_join_rdd.saveAsTextFile("/content/titanic_reduce_side_join_result.txt")

In [16]:
name_rdd.saveAsTextFile("/content/titanic_word_count_result.txt")

In [17]:
survived_rdd.saveAsTextFile("/content/titanic_survived_result.txt")