In [1]:
from pyspark.sql import SparkSession

spark = SparkSession.builder.appName("myApp").master("local[*]").getOrCreate()

In [2]:
#!/bin/bash
! curl -L -o incidents.zip https://www.kaggle.com/api/v1/datasets/download/thedevastator/global-shark-attack-incidents
! unzip incidents.zip

  % Total    % Received % Xferd  Average Speed   Time    Time     Time  Current
                                 Dload  Upload   Total   Spent    Left  Speed
  0     0    0     0    0     0      0      0 --:--:-- --:--:-- --:--:--     0
100  593k  100  593k    0     0  1228k      0 --:--:-- --:--:-- --:--:-- 1228k
Archive:  incidents.zip
  inflating: GSAF5.xls.csv           
  inflating: Sharks Attack Men More .csv  
  inflating: Sharks Attack Men More .json  
  inflating: download-2019-02-20T23-55-18-113Z.png  


In [3]:
from pyspark.sql.types import StructType, StructField, StringType, IntegerType, \
                              BooleanType, DateType, FloatType

from pyspark.sql.functions import col

file_path = "GSAF5.xls.csv"
shark_df = spark.read.\
  csv(file_path, header=True, inferSchema=True)\
  .select("index", "Date", "Year", "Type", "Country", "Area", "Location",
          "Activity", "Name", "Age", "Injury", "Fatal (Y/N)", "Time",
          "Species ")\
  .withColumnRenamed("Species ", "Species")\
  .withColumn("Year", col("Year").cast(IntegerType()))\
  .withColumn("Age", col("Age").cast(IntegerType()))\
  .withColumn("Fatal", col("Fatal (Y/N)").cast(BooleanType()))

shark_df.show()
shark_df.printSchema()

+-----+--------------------+----+------------+----------------+--------------------+--------------------+--------------------+-----------------+----+--------------------+-----------+-------------+--------------------+-----+
|index|                Date|Year|        Type|         Country|                Area|            Location|            Activity|             Name| Age|              Injury|Fatal (Y/N)|         Time|             Species|Fatal|
+-----+--------------------+----+------------+----------------+--------------------+--------------------+--------------------+-----------------+----+--------------------+-----------+-------------+--------------------+-----+
|    0|         05-Feb-2020|2020|  Unprovoked|             USA|                Maui|                NULL|Stand-Up Paddle b...|             NULL|NULL|No injury, but pa...|          N|        09h40|         Tiger shark|false|
|    1|Reported 30-Jan-2020|2020|    Provoked|         BAHAMAS|              Exumas|                NULL

In [4]:
italy_count = shark_df.filter(shark_df.Country == 'ITALY').count()
italy_count

71

In [5]:
italy = shark_df\
  .filter(shark_df.Country == 'ITALY')\
  .select("Date", "year", "area", "location", "age")\
  .sort("year", ascending=False)

In [6]:
italy.show(italy_count, truncate=False)

+------------------------------------+----+-----------------------------------+-----------------------------------------------------------------------------+----+
|Date                                |year|area                               |location                                                                     |age |
+------------------------------------+----+-----------------------------------+-----------------------------------------------------------------------------+----+
|29-Mar-2015                         |2015|Sardinia                           |NULL                                                                         |43  |
|10-Jun-2012                         |2012|Sardinia                           |Muravera                                                                     |57  |
|20-Aug-2006                         |2006|NULL                               |Lampedusa Island                                                             |NULL|
|03-Aug-2001          

In [7]:
from pyspark.sql.functions import regexp_replace, regexp_extract, when

pattern1 = r"(\d{2}-[A-Za-z]{3}-\d{4})"  # <cyfra><cyfra>-<litera><litera><litera>-<cyfra><cyfra><cyfra><cyfra>
pattern2 = r"([A-Za-z]{3}-\d{4})"         # <litera><litera><litera>-<cyfra><cyfra><cyfra><cyfra>
pattern3 = r"(\d{4})"                     # <cyfra><cyfra><cyfra><cyfra>


italy_date = italy\
  .withColumn("date2", \
                    when(regexp_extract(col("date"), pattern1, 0) != "", regexp_extract(col("date"), pattern1, 0))\
                    .when(regexp_extract(col("date"), pattern2, 0) != "", regexp_extract(col("date"), pattern2, 0))\
                    .when(regexp_extract(col("date"), pattern3, 0) != "", regexp_extract(col("date"), pattern3, 0))\
                    .otherwise(None))\
  .sort("year", ascending=False)

italy_date.show(italy_count)

+--------------------+----+--------------------+--------------------+----+-----------+
|                Date|year|                area|            location| age|      date2|
+--------------------+----+--------------------+--------------------+----+-----------+
|         29-Mar-2015|2015|            Sardinia|                NULL|  43|29-Mar-2015|
|         10-Jun-2012|2012|            Sardinia|            Muravera|  57|10-Jun-2012|
|         20-Aug-2006|2006|                NULL|    Lampedusa Island|NULL|20-Aug-2006|
|         03-Aug-2001|2001|                NULL|              Rimini|NULL|03-Aug-2001|
|         24-Sep-1999|1999|        Adriatic Sea|       San Benedetto|NULL|24-Sep-1999|
|         27-Aug-1998|1998|      Marches region|12 miles off Seni...|NULL|27-Aug-1998|
|         30-Jul-1991|1991|        Ligurian Sea|Portofino, 20 mil...|  40|30-Jul-1991|
|         06-Jun-1989|1989|             Tuscany|Marinella, betwee...|NULL|06-Jun-1989|
|         02-Feb-1989|1989|      Tyrrhenian

In [8]:
from pyspark.sql.functions import length, to_date, current_date, lit

italy_date = italy\
  .withColumn("date2", \
                    when(regexp_extract(col("date"), pattern1, 0) != "", regexp_extract(col("date"), pattern1, 0))\
                    .when(regexp_extract(col("date"), pattern2, 0) != "", regexp_extract(col("date"), pattern2, 0))\
                    .when(regexp_extract(col("date"), pattern3, 0) != "", regexp_extract(col("date"), pattern3, 0))\
                    .otherwise(None))\
  .withColumn('dl', length(col("date2")))\
  .withColumn("date3", \
              when(length(col("date2"))==4, to_date(col("date2"), 'yyyy'))\
              .when(length(col("date2"))==8, to_date(col("date2"), 'MMM-yyyy'))\
              .when(length(col("date2"))==11, to_date(col("date2"), 'dd-MMM-yyyy'))\
              .otherwise(None))\
  .filter(col("date3") <= current_date())\
  .filter(col("date3") >= to_date(lit('1900-01-01')))\
  .sort("year", ascending=False)

italy_count = italy_date.count()
print(italy_count)
italy_date.show(italy_count)

61
+--------------------+----+--------------------+--------------------+----+-----------+---+----------+
|                Date|year|                area|            location| age|      date2| dl|     date3|
+--------------------+----+--------------------+--------------------+----+-----------+---+----------+
|         29-Mar-2015|2015|            Sardinia|                NULL|  43|29-Mar-2015| 11|2015-03-29|
|         10-Jun-2012|2012|            Sardinia|            Muravera|  57|10-Jun-2012| 11|2012-06-10|
|         20-Aug-2006|2006|                NULL|    Lampedusa Island|NULL|20-Aug-2006| 11|2006-08-20|
|         03-Aug-2001|2001|                NULL|              Rimini|NULL|03-Aug-2001| 11|2001-08-03|
|         24-Sep-1999|1999|        Adriatic Sea|       San Benedetto|NULL|24-Sep-1999| 11|1999-09-24|
|         27-Aug-1998|1998|      Marches region|12 miles off Seni...|NULL|27-Aug-1998| 11|1998-08-27|
|         30-Jul-1991|1991|        Ligurian Sea|Portofino, 20 mil...|  40|30-Ju

In [9]:
italy_date.dropna?

In [10]:
italy_date.dropna(how="any").show(italy_count)

+--------------------+----+--------------------+--------------------+---+-----------+---+----------+
|                Date|year|                area|            location|age|      date2| dl|     date3|
+--------------------+----+--------------------+--------------------+---+-----------+---+----------+
|         10-Jun-2012|2012|            Sardinia|            Muravera| 57|10-Jun-2012| 11|2012-06-10|
|         30-Jul-1991|1991|        Ligurian Sea|Portofino, 20 mil...| 40|30-Jul-1991| 11|1991-07-30|
|         02-Feb-1989|1989|      Tyrrhenian Sea|Golfo di Baratti,...| 47|02-Feb-1989| 11|1989-02-02|
|         22-Aug-1988|1988|         Manfredonia|           Ippocampo| 16|22-Aug-1988| 11|1988-08-22|
|Mid Jul-1985 or m...|1985|              Sicily|         Punta Secca| 13|   Jul-1985|  8|1985-07-01|
|Reported 02-Jun-1976|1976|Reggio Calabria P...|            Bovalino| 46|02-Jun-1976| 11|1976-06-02|
|         25-Apr-1975|1975|      Genoa Province|             Cervara| 37|25-Apr-1975| 11|19

In [11]:
italy_date.dropna(subset=["location"]).show(italy_count)


+--------------------+----+--------------------+--------------------+----+-----------+---+----------+
|                Date|year|                area|            location| age|      date2| dl|     date3|
+--------------------+----+--------------------+--------------------+----+-----------+---+----------+
|         10-Jun-2012|2012|            Sardinia|            Muravera|  57|10-Jun-2012| 11|2012-06-10|
|         20-Aug-2006|2006|                NULL|    Lampedusa Island|NULL|20-Aug-2006| 11|2006-08-20|
|         03-Aug-2001|2001|                NULL|              Rimini|NULL|03-Aug-2001| 11|2001-08-03|
|         24-Sep-1999|1999|        Adriatic Sea|       San Benedetto|NULL|24-Sep-1999| 11|1999-09-24|
|         27-Aug-1998|1998|      Marches region|12 miles off Seni...|NULL|27-Aug-1998| 11|1998-08-27|
|         30-Jul-1991|1991|        Ligurian Sea|Portofino, 20 mil...|  40|30-Jul-1991| 11|1991-07-30|
|         06-Jun-1989|1989|             Tuscany|Marinella, betwee...|NULL|06-Jun-1

In [12]:
italy_date.fillna(value="------").show(italy_count)

+--------------------+----+--------------------+--------------------+----+-----------+---+----------+
|                Date|year|                area|            location| age|      date2| dl|     date3|
+--------------------+----+--------------------+--------------------+----+-----------+---+----------+
|         29-Mar-2015|2015|            Sardinia|              ------|  43|29-Mar-2015| 11|2015-03-29|
|         10-Jun-2012|2012|            Sardinia|            Muravera|  57|10-Jun-2012| 11|2012-06-10|
|         20-Aug-2006|2006|              ------|    Lampedusa Island|NULL|20-Aug-2006| 11|2006-08-20|
|         03-Aug-2001|2001|              ------|              Rimini|NULL|03-Aug-2001| 11|2001-08-03|
|         24-Sep-1999|1999|        Adriatic Sea|       San Benedetto|NULL|24-Sep-1999| 11|1999-09-24|
|         27-Aug-1998|1998|      Marches region|12 miles off Seni...|NULL|27-Aug-1998| 11|1998-08-27|
|         30-Jul-1991|1991|        Ligurian Sea|Portofino, 20 mil...|  40|30-Jul-1

In [13]:
italy_date.fillna({"area" : "ITALY", "age" : -1}).show(italy_count)

+--------------------+----+--------------------+--------------------+---+-----------+---+----------+
|                Date|year|                area|            location|age|      date2| dl|     date3|
+--------------------+----+--------------------+--------------------+---+-----------+---+----------+
|         29-Mar-2015|2015|            Sardinia|                NULL| 43|29-Mar-2015| 11|2015-03-29|
|         10-Jun-2012|2012|            Sardinia|            Muravera| 57|10-Jun-2012| 11|2012-06-10|
|         20-Aug-2006|2006|               ITALY|    Lampedusa Island| -1|20-Aug-2006| 11|2006-08-20|
|         03-Aug-2001|2001|               ITALY|              Rimini| -1|03-Aug-2001| 11|2001-08-03|
|         24-Sep-1999|1999|        Adriatic Sea|       San Benedetto| -1|24-Sep-1999| 11|1999-09-24|
|         27-Aug-1998|1998|      Marches region|12 miles off Seni...| -1|27-Aug-1998| 11|1998-08-27|
|         30-Jul-1991|1991|        Ligurian Sea|Portofino, 20 mil...| 40|30-Jul-1991| 11|19

In [14]:
from pyspark.sql.functions import avg
mean_age = shark_df.select(avg("age").alias("avg_age")).collect()[0]["avg_age"]
mean_age

27.512356321839082

In [15]:
italy_date.fillna(subset=["age"], value=mean_age).show(italy_count)

+--------------------+----+--------------------+--------------------+---+-----------+---+----------+
|                Date|year|                area|            location|age|      date2| dl|     date3|
+--------------------+----+--------------------+--------------------+---+-----------+---+----------+
|         29-Mar-2015|2015|            Sardinia|                NULL| 43|29-Mar-2015| 11|2015-03-29|
|         10-Jun-2012|2012|            Sardinia|            Muravera| 57|10-Jun-2012| 11|2012-06-10|
|         20-Aug-2006|2006|                NULL|    Lampedusa Island| 27|20-Aug-2006| 11|2006-08-20|
|         03-Aug-2001|2001|                NULL|              Rimini| 27|03-Aug-2001| 11|2001-08-03|
|         24-Sep-1999|1999|        Adriatic Sea|       San Benedetto| 27|24-Sep-1999| 11|1999-09-24|
|         27-Aug-1998|1998|      Marches region|12 miles off Seni...| 27|27-Aug-1998| 11|1998-08-27|
|         30-Jul-1991|1991|        Ligurian Sea|Portofino, 20 mil...| 40|30-Jul-1991| 11|19

In [16]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import coalesce

# Create Spark session
spark = SparkSession.builder.appName("PreferredContactExample").getOrCreate()

# Sample customer data
data = [
    (1, "customer1@example.com", None, "New York, 123 Example St"),
    (2, None, "+1 555 678 9012", "Los Angeles, 456 Another St"),
    (3, None, None, "Chicago, 789 Third St"),
    (4, None, None, None)
]
columns = ["id", "email", "phone", "mailing_address"]

df = spark.createDataFrame(data, columns)

# Use coalesce to determine the preferred contact method
df = df.withColumn("preferred_contact", coalesce("email", "phone", "mailing_address"))

# Show the results
df.show(truncate=False)


+---+---------------------+---------------+---------------------------+---------------------+
|id |email                |phone          |mailing_address            |preferred_contact    |
+---+---------------------+---------------+---------------------------+---------------------+
|1  |customer1@example.com|NULL           |New York, 123 Example St   |customer1@example.com|
|2  |NULL                 |+1 555 678 9012|Los Angeles, 456 Another St|+1 555 678 9012      |
|3  |NULL                 |NULL           |Chicago, 789 Third St      |Chicago, 789 Third St|
|4  |NULL                 |NULL           |NULL                       |NULL                 |
+---+---------------------+---------------+---------------------------+---------------------+

