In [None]:
from abc import ABC, abstractmethod
import pandas as pd
from pyspark.sql import SparkSession

class DataExtractor(ABC):
    @abstractmethod
    def extract(self):
        pass

class ExcelExtractor(DataExtractor):

    def __init__(self, path):
        self.path = path


    def extract(self):

        spark = SparkSession.builder\
            .getOrCreate()

        for x in ["film", "inventory", "rental", "customer", "store"]:

            df_aux = pd.read_excel(self.path, sheet_name=x)

            df = spark.createDataFrame(df_aux)

            for c in df.columns:

                df = df.withColumnRenamed(c, c.strip())


            globals().update({f"df_{x}":df})

        return df_film, df_inventory, df_rental, df_customer, df_store


In [None]:
extractor = ExcelExtractor("Films_2.xlsx")


In [None]:
df_film, df_inventory, df_rental, df_customer, df_store = extractor.extract()

In [None]:

df_inventory.show()

+------------+-------+--------+--------------------+
|inventory_id|film_id|store_id|         last_update|
+------------+-------+--------+--------------------+
|           1|      1|       1| 2006-02-15 05:09:17|
|           2|      1|       1| 2006-02-15 05:09:17|
|           3|      1|       1| 2006-02-15 05:09:17|
|           4|      1|       1| 2006-02-15 05:09:17|
|           5|      1|       2| 2006-02-15 05:09:17|
|           6|      1|       2| 2006-02-15 05:09:17|
|           7|      1|       2| 2006-02-15 05:09:17|
|           8|      1|       2| 2006-02-15 05:09:17|
|           9|      2|       2| 2006-02-15 05:09:17|
|          10|      2|       2| 2006-02-15 05:09:17|
|          11|      2|       2| 2006-02-15 05:09:17|
|          12|      3|       2| 2006-02-15 05:09:17|
|          13|      3|       2| 2006-02-15 05:09:17|
|          14|      3|       2| 2006-02-15 05:09:17|
|          15|      3|   2*$#"| 2006-02-15 05:09:17|
|          16|      4|       2| 2006-02-15 05:

In [None]:
# !pip install findspark
import findspark
from pyspark.sql import SparkSession
from pyspark.sql.functions import regexp_extract, col, to_timestamp, trim, substring, regexp_replace
import pandas as pd
import os
import sys


findspark.init()



spark = SparkSession.builder\
        .appName('test')\
        .getOrCreate()

In [None]:
df.show(5)

+--------------------+--------------------+------------+-----------+---------------+-----------+------+----------------+---------------+------+----------------+--------------------+
|               title|         description|release_year|language_id|rental_duration|rental_rate|length|replacement_cost|num_voted_users|rating|special_features|         last_update|
+--------------------+--------------------+------------+-----------+---------------+-----------+------+----------------+---------------+------+----------------+--------------------+
|      FIDELITY DEVIL| A Awe-Inspiring ...|        2006|          1|              5|       4.99|   118|           11.99|          26500|     G|        Trailers| 2020-01-25 14:40:46|
| FORRESTER COMANC...| A Fateful Tale o...|        2006|          1|              7|       4.99|   112|           22.99|          61150| NC-17|        Trailers| 2020-01-25 14:40:46|
|    INSTINCT AIRPORT| A Touching Docum...|        2006|          1|              4|      

In [None]:
df_ax = pd.read_excel("Films_2.xlsx", sheet_name="film")

df = spark.createDataFrame(df_ax)

for c in df.columns:

  df = df.withColumnRenamed(c, c.strip())

df = df.drop('original_language_id')

cols_n = ["film_id",
"release_year",
"language_id",
"rental_duration",
"rental_rate",
"length",
"replacement_cost",
"num_voted_users",]

cols_s = ['title',
'description',
'rating',
'special_features']


for c in cols_n:
  if c in ["release_year", "language_id", "rental_duration", "length", "num_voted_users", "film_id"]:

    df = df.withColumn(c, regexp_extract(c, "\d{1,}\.{0,}\d{0,}", 0)).withColumn(c, col(c).cast("int"))

  else:

    df = df.withColumn(c, regexp_extract(c, "\d{1,}\.{0,}\d{0,}", 0)).withColumn(c, col(c).cast("double"))

for c in cols_s:

  df = df.withColumn(c, trim(c))

df = df.withColumn("last_update", trim("last_update")).withColumn("last_update", to_timestamp("last_update", "yyyy-MM-dd HH:mm:ss"))

df = df.withColumn("description", substring("description", 3, 1000))

df = df.drop_duplicates()

+-------+-------------------+--------------------+------------+-----------+---------------+-----------+------+----------------+---------------+------+----------------+-------------------+
|film_id|              title|         description|release_year|language_id|rental_duration|rental_rate|length|replacement_cost|num_voted_users|rating|special_features|        last_update|
+-------+-------------------+--------------------+------------+-----------+---------------+-----------+------+----------------+---------------+------+----------------+-------------------+
|    129|         CAUSE DATE|Taut Tale of a Ex...|        2006|          1|              3|       2.99|   179|           16.99|          53300|     R|    Commentaries|2020-01-25 14:40:46|
|    397|      HANKY OCTOBER|Boring Epistle of...|        2006|          1|              5|       2.99|   107|           26.99|          58050| NC-17|        Trailers|2020-01-25 14:40:46|
|      6|       AGENT TRUMAN|Intrepid Panorama...|        20

In [None]:
df_ax = pd.read_excel("Films_2.xlsx", sheet_name="inventory")

df = spark.createDataFrame(df_ax)

for c in df.columns:

  df = df.withColumnRenamed(c, c.strip())


df = df.withColumn("store_id", regexp_extract("store_id", "\d{0,}", 0)).withColumn("store_id", col("store_id").cast("int"))

df = df.withColumn("last_update", trim("last_update")).withColumn("last_update", to_timestamp("last_update", "yyyy-MM-dd HH:mm:ss"))


for c in ["inventory_id", "film_id"]:
  df = df.withColumn(c, col(c).cast("int"))


In [None]:
df.printSchema()

for c in ["inventory_id", "film_id"]:
  df = df.withColumn(c, col(c).cast("int"))

root
 |-- inventory_id: long (nullable = true)
 |-- film_id: long (nullable = true)
 |-- store_id: integer (nullable = true)
 |-- last_update: timestamp (nullable = true)

+------------+-------+--------+-------------------+
|inventory_id|film_id|store_id|        last_update|
+------------+-------+--------+-------------------+
|           1|      1|       1|2006-02-15 05:09:17|
|           2|      1|       1|2006-02-15 05:09:17|
|           3|      1|       1|2006-02-15 05:09:17|
|           4|      1|       1|2006-02-15 05:09:17|
|           5|      1|       2|2006-02-15 05:09:17|
|           6|      1|       2|2006-02-15 05:09:17|
|           7|      1|       2|2006-02-15 05:09:17|
|           8|      1|       2|2006-02-15 05:09:17|
|           9|      2|       2|2006-02-15 05:09:17|
|          10|      2|       2|2006-02-15 05:09:17|
|          11|      2|       2|2006-02-15 05:09:17|
|          12|      3|       2|2006-02-15 05:09:17|
|          13|      3|       2|2006-02-15 05:09:

In [None]:
df.columns

['rental_id',
 'rental_date',
 'inventory_id',
 'customer_id',
 'return_date',
 'staff_id',
 'last_update']

In [None]:
df_ax = pd.read_excel("Films_2.xlsx", sheet_name="rental")

df = spark.createDataFrame(df_ax)

for c in df.columns:

  df = df.withColumnRenamed(c, c.strip())

cols = ['rental_date', 'return_date', 'last_update']

cols_n = ["rental_id", "inventory_id", "customer_id", "staff_id"]

for c in cols:
  df = df.withColumn(c, trim(c)).withColumn(c, to_timestamp(c, "yyyy-MM-dd HH:mm:ss"))

for c in cols_n:
  df = df.withColumn(c, col(c).cast("int"))

df.printSchema()


root
 |-- rental_id: integer (nullable = true)
 |-- rental_date: timestamp (nullable = true)
 |-- inventory_id: integer (nullable = true)
 |-- customer_id: integer (nullable = true)
 |-- return_date: timestamp (nullable = true)
 |-- staff_id: integer (nullable = true)
 |-- last_update: timestamp (nullable = true)



In [None]:
df.show(5)

+-----------+--------+----------+---------+--------------------+----------+------+--------------------+--------------------+---------------+-------+
|customer_id|store_id|first_name|last_name|               email|address_id|active|         create_date|         last_update|customer_id_old|segment|
+-----------+--------+----------+---------+--------------------+----------+------+--------------------+--------------------+---------------+-------+
|          1|       1|      MARY|    SMITH| MARY.SMITH@sakil...|         5|     1| 2006-02-14 22:04:36| 2006-02-15 04:57:20|           NULL|   NULL|
|          2|       1|  PATRICIA|  JOHNSON| PATRICIA.JOHNSON...|         6|     1| 2006-02-14 22:04:36| 2006-02-15 04:57:20|           NULL|   NULL|
|          3|       1|     LINDA| WILLIAMS| LINDA.WILLIAMS@s...|         7|     1| 2006-02-14 22:04:36| 2006-02-15 04:57:20|           NULL|   NULL|
|          4|       2|   BARBARA|    JONES| BARBARA.JONES@sa...|         8|     1| 2006-02-14 22:04:36| 20

In [None]:
df.columns

['customer_id',
 'store_id',
 'first_name',
 'last_name',
 'email',
 'address_id',
 'active',
 'create_date',
 'last_update',
 'customer_id_old',
 'segment']

In [None]:
df_ax = pd.read_excel("Films_2.xlsx", sheet_name="customer")

df = spark.createDataFrame(df_ax)

for c in df.columns:

  df = df.withColumnRenamed(c, c.strip())

cols_s = ['first_name',
'last_name',
'email',
'customer_id_old',
'segment']

cols_n = ["customer_id",
"store_id",
"address_id",
"active"]

for c in cols_n:
  df = df.withColumn(c, col(c).cast("int"))

for c in cols_s:
  df = df.withColumn(c, trim(c))

for c in ["create_date", "last_update"]:

  df = df.withColumn(c, trim(c)).withColumn(c, to_timestamp(c, "yyyy-MM-dd HH:mm:ss"))

val = df.groupBy("segment").count().orderBy(col("count").desc()).filter("segment != 'NULL'").orderBy(col("count").desc()).limit(1).collect()[0]["segment"]

df = df.withColumn("segment", regexp_replace("segment", "NULL", val))
df = df.withColumn("customer_id_old", regexp_replace("customer_id_old", "NULL", "Not Aplicable"))





+-----------+-----+
|    segment|count|
+-----------+-----+
|   Consumer|  409|
|  Corporate|  236|
|Home Office|  148|
+-----------+-----+



In [None]:
df_ax = pd.read_excel("Films_2.xlsx", sheet_name="store")

df = spark.createDataFrame(df_ax)

for c in df.columns:

  df = df.withColumnRenamed(c, c.strip())

df = df.withColumn("last_update", trim("last_update")).withColumn("last_update", to_timestamp("last_update", "yyyy-MM-dd HH:mm:ss"))



In [None]:
from pyspark.ml.feature import Imputer

imputer = Imputer(
      inputCols=[],
      outputCols=[]
  ).setStrategy("mean")

In [None]:
# df.replace("NULL", np.nan)
# imputer.fit(df).transform(df)
df.filter("original_language_id != NULL").show()
df.filter((df['rating'] <= 3) & (df['rental_duration'] < 6))

+-----+-----------+------------+-----------+--------------------+---------------+-----------+------+----------------+---------------+------+----------------+-----------+
|title|description|release_year|language_id|original_language_id|rental_duration|rental_rate|length|replacement_cost|num_voted_users|rating|special_features|last_update|
+-----+-----------+------------+-----------+--------------------+---------------+-----------+------+----------------+---------------+------+----------------+-----------+
+-----+-----------+------------+-----------+--------------------+---------------+-----------+------+----------------+---------------+------+----------------+-----------+



DataFrame[title: string, description: string, release_year: string, language_id: bigint, original_language_id: string, rental_duration: bigint, rental_rate: string, length: string, replacement_cost: string, num_voted_users: string, rating: string, special_features: string, last_update: string]