# Libraries downloading.

In [1]:
!pip install mysql-connector-python

Collecting mysql-connector-python
  Downloading mysql_connector_python-8.0.28-cp39-cp39-manylinux1_x86_64.whl (37.6 MB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m37.6/37.6 MB[0m [31m1.8 MB/s[0m eta [36m0:00:00[0m00:01[0m00:01[0mm
Installing collected packages: mysql-connector-python
Successfully installed mysql-connector-python-8.0.28


In [2]:
!pip install pymysql

Collecting pymysql
  Downloading PyMySQL-1.0.2-py3-none-any.whl (43 kB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m43.8/43.8 KB[0m [31m926.7 kB/s[0m eta [36m0:00:00[0m [36m0:00:01[0m
[?25hInstalling collected packages: pymysql
Successfully installed pymysql-1.0.2


# Getting data.

In [3]:
!wget https://web.stanford.edu/class/archive/cs/cs109/cs109.1166/stuff/titanic.csv

--2022-03-05 12:00:45--  https://web.stanford.edu/class/archive/cs/cs109/cs109.1166/stuff/titanic.csv
Resolving web.stanford.edu (web.stanford.edu)... 171.67.215.200, 2607:f6d0:0:925a::ab43:d7c8
Connecting to web.stanford.edu (web.stanford.edu)|171.67.215.200|:443... connected.
HTTP request sent, awaiting response... 200 OK
Length: 44225 (43K) [text/csv]
Saving to: ‘titanic.csv’


2022-03-05 12:00:46 (218 KB/s) - ‘titanic.csv’ saved [44225/44225]



# Imports

In [7]:
import pandas as pd 
import numpy
import matplotlib.pyplot as plt 
from pyspark.sql import SparkSession
from pyspark.sql import functions as F
import mysql.connector
from mysql.connector import Error
import sqlalchemy as db

# Spark initiation

In [8]:
# create sparksession
spark = SparkSession \
    .builder \
    .appName("PySparkExercise") \
    .config("spark.some.config.option", "some-value") \
    .getOrCreate()

In [9]:
titanic_df = spark.read.csv('titanic.csv', header = 'True', inferSchema='True')

In [10]:
titanic_df.printSchema()

root
 |-- Survived: integer (nullable = true)
 |-- Pclass: integer (nullable = true)
 |-- Name: string (nullable = true)
 |-- Sex: string (nullable = true)
 |-- Age: double (nullable = true)
 |-- Siblings/Spouses Aboard: integer (nullable = true)
 |-- Parents/Children Aboard: integer (nullable = true)
 |-- Fare: double (nullable = true)



In [157]:
titanic_df.createOrReplaceTempView("titanic_table");

In [11]:
titanic_df.show(5)

+--------+------+--------------------+------+----+-----------------------+-----------------------+-------+
|Survived|Pclass|                Name|   Sex| Age|Siblings/Spouses Aboard|Parents/Children Aboard|   Fare|
+--------+------+--------------------+------+----+-----------------------+-----------------------+-------+
|       0|     3|Mr. Owen Harris B...|  male|22.0|                      1|                      0|   7.25|
|       1|     1|Mrs. John Bradley...|female|38.0|                      1|                      0|71.2833|
|       1|     3|Miss. Laina Heikk...|female|26.0|                      0|                      0|  7.925|
|       1|     1|Mrs. Jacques Heat...|female|35.0|                      1|                      0|   53.1|
|       0|     3|Mr. William Henry...|  male|35.0|                      0|                      0|   8.05|
+--------+------+--------------------+------+----+-----------------------+-----------------------+-------+
only showing top 5 rows



# Exercises

## 1.

In [166]:
percent_of_children_who_stayed_alive = spark.sql("""SELECT Round(100 * count(Name) / (SELECT count(Name)
                                                                                      FROM titanic_table
                                                                                      WHERE Age < 18), 2)
                                                                                      AS perc_of_children_alive
                                                    FROM titanic_table
                                                    WHERE Age < 18
                                                    AND Survived = 1""").collect()[0][0]

## 2.

In [168]:
percent_of_adults_before_40_who_stayed_alive = spark.sql("""SELECT Round(100 * count(Name) / (SELECT count(Name)
                                                                                             FROM titanic_table
                                                                                             WHERE Age >= 18
                                                                                             AND Age <= 40), 2)
                                                                                             AS perc_of_adults_alive
                                                            FROM titanic_table
                                                            WHERE Age >= 18
                                                            AND Age <= 40
                                                            AND Survived = 1""").collect()[0][0]

## 3.

In [170]:
adults_male_under_40 = titanic_df.select('Name')\
                                 .where((F.col('Age') <= 40) &
                                        (F.col('Age') >= 18) &
                                        (F.col('Sex') == 'male'))\
                                 .count()

In [171]:
adults_female_under_40 = titanic_df.select('Name')\
                                   .where((F.col('Age') <= 40) &
                                          (F.col('Age') >= 18) &
                                          (F.col('Sex') == 'female'))\
                                   .count()

In [172]:
adults_male_under_40_alive = titanic_df.select('Name')\
                                       .where((F.col('Age') <= 40) &
                                              (F.col('Age') >= 18) &
                                              (F.col('Sex') == 'male') &
                                              (F.col('Survived') == 1))\
                                       .count()

In [174]:
adults_female_under_40_alive = titanic_df.select('Name')\
                                         .where((F.col('Age') <= 40) &
                                                (F.col('Age') >= 18) &
                                                (F.col('Sex') == 'female') &
                                                (F.col('Survived') == 1))\
                                         .count()

In [177]:
percent_of_adults_male_under_40_alive = round(100 * adults_male_under_40_alive / adults_male_under_40, 2)

In [178]:
percent_of_adults_female_under_40_alive = round(100 * adults_female_under_40_alive / adults_female_under_40, 2)

## 4.

In [151]:
number_of_adults_over_40 = spark.sql("""SELECT Round(100 * count(Name) / (SELECT count(Name)
                                                                          FROM mytable
                                                                          WHERE Age > 40), 2) AS Percent_of_alive
                                        FROM mytable
                                        WHERE Age > 40
                                        AND Survived = 1""").collect()[0][0]

## 5.

In [188]:
adults_male_over_40 = titanic_df.select('Name')\
                                .where((F.col('Age') > 40) &
                                      (F.col('Sex') == 'male'))\
                                .count()

In [189]:
adults_female_over_40 = titanic_df.select('Name')\
                                  .where((F.col('Age') > 40) &
                                        (F.col('Sex') == 'female'))\
                                  .count()

In [190]:
adults_male_over_40_alive = titanic_df.select('Name')\
                                      .where((F.col('Age') > 40) &
                                             (F.col('Sex') == 'male') &
                                             (F.col('Survived') == 1))\
                                      .count()

In [191]:
adults_female_over_40_alive = titanic_df.select('Name')\
                                        .where((F.col('Age') > 40) &
                                               (F.col('Sex') == 'female') &
                                               (F.col('Survived') == 1))\
                                        .count()

In [192]:
percent_of_adults_male_over_40_alive = round(100 * adults_male_over_40_alive / adults_male_over_40, 2)

In [193]:
percent_of_adults_female_over_40_alive = round(100 * adults_female_over_40_alive / adults_female_over_40, 2)

## 6.

In [194]:
number_of_male_class_1 = titanic_df.select('Name')\
                                   .where((F.col('Pclass') == 1) &
                                          (F.col('Sex') == 'male'))\
                                   .count()

In [195]:
number_of_male_class_2 = titanic_df.select('Name')\
                                   .where((F.col('Pclass') == 2) &
                                          (F.col('Sex') == 'male'))\
                                   .count()

In [196]:
number_of_male_class_3 = titanic_df.select('Name')\
                                   .where((F.col('Pclass') == 3) &
                                          (F.col('Sex') == 'male'))\
                                   .count()

In [197]:
number_of_male_class_1_alive = titanic_df.select('Name')\
                                         .where((F.col('Pclass') == 1) &
                                                (F.col('Sex') == 'male') &
                                                (F.col('Survived') == 1))\
                                         .count()

In [200]:
number_of_male_class_2_alive = titanic_df.select('Name')\
                                         .where((F.col('Pclass') == 2) &
                                                (F.col('Sex') == 'male') &
                                                (F.col('Survived') == 1))\
                                         .count()

In [201]:
number_of_male_class_3_alive = titanic_df.select('Name')\
                                         .where((F.col('Pclass') == 3) &
                                                (F.col('Sex') == 'male') &
                                                (F.col('Survived') == 1))\
                                         .count()

In [202]:
number_of_female_class_1 = titanic_df.select('Name')\
                                     .where((F.col('Pclass') == 1) &
                                            (F.col('Sex') == 'female'))\
                                     .count()

In [203]:
number_of_female_class_2 = titanic_df.select('Name')\
                                     .where((F.col('Pclass') == 2) &
                                            (F.col('Sex') == 'female'))\
                                     .count()

In [204]:
number_of_female_class_3 = titanic_df.select('Name')\
                                     .where((F.col('Pclass') == 3) &
                                            (F.col('Sex') == 'female'))\
                                     .count()

In [205]:
number_of_female_class_1_alive = titanic_df.select('Name')\
                                           .where((F.col('Pclass') == 1) &
                                                  (F.col('Sex') == 'female') &
                                                  (F.col('Survived') == 1))\
                                           .count()

In [206]:
number_of_female_class_2_alive = titanic_df.select('Name')\
                                           .where((F.col('Pclass') == 2) &
                                                  (F.col('Sex') == 'female') &
                                                  (F.col('Survived') == 1))\
                                           .count()

In [207]:
number_of_female_class_3_alive = titanic_df.select('Name')\
                                           .where((F.col('Pclass') == 3) &
                                                  (F.col('Sex') == 'female') &
                                                  (F.col('Survived') == 1))\
                                           .count()

In [208]:
male_alive_class_1 = round(100 * number_of_male_class_1_alive / number_of_male_class_1, 2)

In [209]:
male_alive_class_2 = round(100 * number_of_male_class_2_alive / number_of_male_class_2, 2)

In [210]:
male_alive_class_3 = round(100 * number_of_male_class_1_alive / number_of_male_class_3, 2)

In [211]:
female_alive_class_1 = round(100 * number_of_female_class_1_alive / number_of_female_class_1, 2)

In [212]:
female_alive_class_2 = round(100 * number_of_female_class_2_alive / number_of_female_class_2, 2)

In [213]:
female_alive_class_3 = round(100 * number_of_female_class_3_alive / number_of_female_class_3, 2)

# Db connection.

In [None]:
conn = mysql.connector.connect(
  host="3306",
  user="root",
  password="pass",
  database="spark_results"
)

In [None]:
mycursor = mydb.cursor()

# Inserting data

In [None]:
sql = "INSERT INTO results (exercise_number, result, gender) VALUES (%s, %s)"
val = [
  (1, percent_of_children_who_stayed_alive, 'None', 'all'),
  (2, percent_of_adult_before_40_who_stayed_alive, 'None', 'all'),
  (3, percent_of_adult_male_under_40_alive, 'male', 'all'),
  (3, percent_of_adult_female_under_40_alive, 'female', 'all'),
  (4, number_of_adults_over_40, 'None', 'all'),
  (5, percent_of_adult_male_over_40_alive, 'male', 'all'),
  (5, percent_of_adult_female_over_40_alive, 'female', 'all'),
  (6, number_of_male_class_1, 'male', '1'),
  (6, number_of_male_class_2, 'male', '2'),
  (6, number_of_male_class_3, 'male', '3'),
  (6, number_of_female_class_1, 'male', '1'),
  (6, number_of_female_class_2, 'female', '2'),
  (6, number_of_female_class_3, 'female', '3'),
]

In [None]:
mycursor.executemany(sql, val)

In [None]:
mydb.commit()

In [None]:
conn.close()