<a href="https://colab.research.google.com/github/nettorobson/pyspark_delta_tests/blob/main/spark_colab_imdb.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

# PySpark

###

In [5]:
# Instalação do java
!apt-get install openjdk-8-jdk-headless -qq > /dev/null

The Python script you provided is a shell command that installs the OpenJDK 8 (Java Development Kit) without a graphical user interface in a Google Colab environment. Let's break down each part of the command:

Explanation of the components:
- !: This symbol tells the Jupyter or Colab notebook that the following line is a shell command rather than Python code. In Google Colab, using ! allows you to run terminal commands directly.
apt-get install:

- apt-get is a package management tool used in Debian-based Linux distributions (like Ubuntu, which Google Colab uses) to install, update, or remove software packages.
install is the argument that tells apt-get to install the specified package—in this case, OpenJDK 8.
- openjdk-8-jdk-headless: openjdk-8-jdk refers to the OpenJDK (Java Development Kit) version 8, which is an open-source implementation of the Java Platform, Standard Edition.
- headless specifies that this version does not include any GUI (Graphical User Interface) components, which is useful in environments like Google Colab, where graphical applications are not needed. It's more lightweight and suitable for server or cloud environments.
- qq: This stands for "quiet mode." It tells apt-get to suppress unnecessary output during the installation process. Only essential messages will be displayed. Adding this flag reduces the amount of text printed in the Colab notebook, making the output cleaner.
- /dev/null: This redirects all output from the command to /dev/null, which is a special file in Linux that discards everything written to it.
Essentially, this part of the command suppresses any output (errors or logs) from appearing in the Colab notebook. Combined with -qq, it ensures that the installation process runs quietly without printing any messages.

<br>Summary:
This command installs OpenJDK 8 (Java Development Kit) in a "headless" mode (without a graphical user interface) on a Debian-based system like the one in Google Colab. It runs quietly, suppressing both output and error messages, keeping the notebook output clean.


In [6]:
# Baixando a versão mais recente+estável do Spark
!wget -q https://dlcdn.apache.org/spark/spark-3.4.3/spark-3.4.3-bin-hadoop3.tgz

The Python script you provided is a shell command to download a file from the web using the wget tool in the Google Colab environment.

This command downloads the compressed Apache Spark 3.5.3 binary (compiled to work with Hadoop 3) from the Apache website. The -q flag suppresses output, so the download process runs quietly, only showing errors if they occur. This is typically done when setting up Spark for distributed data processing in the Google Colab environment.

- wget is a command-line utility for downloading files from the web. It supports protocols such as HTTP, HTTPS, and FTP, making it useful for retrieving files over the internet.

- https://www.apache.org/dyn/closer.lua/spark/spark-3.5.3/spark-3.5.3-bin-hadoop3.tgz: This is the URL of the file that wget will download.
The file being downloaded is spark-3.5.3-bin-hadoop3.tgz, which is an archive containing Apache Spark version 3.5.3, bundled with Hadoop 3.
 - Spark is a distributed computing framework used for big data processing.
 - Hadoop 3 is a framework for distributed storage and processing of large datasets across clusters of computers.
The .tgz extension indicates that the file is a compressed **tarball** archive (similar to a .zip file but commonly used in Unix/Linux systems). It combines .tar for archiving and .gz for compression.
The specific URL (ending in .lua) redirects to a mirror site closer to the user's location to download the requested file efficiently

In [7]:
# Descompactar o spark .tgz
!tar xf /content/spark-3.4.3-bin-hadoop3.tgz

Explanation of the components:

- !: This symbol indicates that the line is a shell command, **not Python code**. It allows you to run _terminal commands_ directly in Google Colab.
tar:

- tar is a Unix command used to work with tarball files (which are compressed or uncompressed archive files). The .tar format is commonly used for bundling multiple files and directories into one file.

- xf: These are options passed to the tar command:
 - x: Extract mode. This option tells tar to extract the contents of the archive file.
 - f: File. This option specifies that you're working with a file, followed by the path to the file (/content/spark-3.4.3-bin-hadoop3.tgz.1).
/content/spark-3.4.3-bin-hadoop3.tgz.1:

  This is the path to the archive file that you're extracting. In this case:
The file is located in the /content/ directory of the Google Colab environment.
The file name is spark-3.4.3-bin-hadoop3.tgz.1.
The .tgz extension indicates that this is a tarball file compressed with Gzip.
The .1 suffix might indicate that this file is a duplicate download (Google Colab appends .1, .2, etc., if the same file is downloaded multiple times).
This file contains Apache Spark version 3.4.3, bundled with Hadoop 3.


**Summary:**
This command extracts the contents of the file spark-3.4.3-bin-hadoop3.tgz.1, which is a Gzipped tar archive containing Apache Spark 3.4.3 for Hadoop 3, in the Google Colab environment. After running the command, the extracted files will be available in the current working directory (/content/ in this case).

In [8]:
# Criando as variáveis de ambiente
import os
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-8-openjdk-amd64"
os.environ["SPARK_HOME"] = "/content/spark-3.4.3-bin-hadoop3"

The Python script you provided sets up environment variables in the Google Colab environment, which are necessary for configuring Java and Apache Spark. Let's break down each part:

#### Explanation:

- import os: This imports the os module, which provides functions to interact with the operating system. One of its functionalities is setting environment variables using the os.environ method.

- os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-8-openjdk-amd64": This line sets the JAVA_HOME environment variable.
JAVA_HOME is used by Java-based applications to locate the Java installation directory. In this case, it is set to the path /usr/lib/jvm/java-8-openjdk-amd64, which is where Java 8 (OpenJDK) is installed in the Colab environment.
**Setting this variable ensures that when Java is required (for example, by Apache Spark), it can correctly locate the Java installation.**

- os.environ["SPARK_HOME"] = "/content/spark-3.4.3-bin-hadoop3": This line sets the SPARK_HOME environment variable.
SPARK_HOME is used to tell the system where Apache Spark is installed. In this case, it is set to the path /content/spark-3.4.3-bin-hadoop3, which is the directory where Apache Spark version 3.4.3 (with Hadoop 3) is extracted.
**By setting this variable, tools and scripts that need to interact with Spark can easily locate its installation.**

#### Summary:
This script sets two environment variables, JAVA_HOME and SPARK_HOME, which are essential for ensuring that Apache Spark can correctly find Java and its own installation path. This setup is typically done when configuring Spark to run in Google Colab.

JAVA_HOME points to the Java 8 OpenJDK installation, which is required for running Spark.
SPARK_HOME points to the directory where Spark 3.4.3 with Hadoop 3 is located.
These settings are necessary before running Spark-based applications or interacting with Spark in the Colab environment.



In [9]:
# Instalar a lib 'findspark' que a juda a localizar o Spark no sistema e importá-lo como uma biblioteca regular
!pip install -q findspark

In [10]:
# Importar a lib findspark baixada e inicializar a lib
import findspark
findspark.init()

**According the documentation:**

PySpark isn't on sys.path by default, but that doesn't mean it can't be used as a regular library. You can address this by either symlinking pyspark into your site-packages, or adding pyspark to sys.path at runtime. findspark does the latter.

To initialize PySpark, just call

```
import findspark
findspark.init()

import pyspark
sc = pyspark.SparkContext(appName="myAppName")
```
Without any arguments, the SPARK_HOME environment variable will be used, and if that isn't set, other possible install locations will be checked.

Alternatively, you can specify a location with the spark_home argument.

```
findspark.init('/path/to/spark_home')
```






## Passo 2: Spark Session


In [11]:
# Criar a Spark Session
from pyspark.sql import SparkSession
spark = SparkSession.builder\
        .master('local')\
        .appName('sparkcolab')\
        .getOrCreate()

This Python script sets up a PySpark session in the Google Colab environment, allowing you to use Apache Spark's distributed computing capabilities for data processing. Let's break it down step by step:

#### Explanation of the components:

- from pyspark.sql import SparkSession: This imports the SparkSession class from PySpark's SQL module.
***SparkSession is the entry point for using Apache Spark in Python**. It provides an interface for programming with Spark, including reading data, creating dataframes, and running queries.

- spark = SparkSession.builder: **The SparkSession.builder is a constructor that initializes the configuration needed to create a new Spark session.**
The session (spark) is an instance that will allow you to run Spark operations, such as loading datasets, performing transformations, and executing distributed computations.

- .master('local'): **The .master() method sets the cluster manager or the mode in which Spark will run.**
The argument 'local' means that Spark will run locally on the current machine (_in this case, on Google Colab_) rather than on a cluster of distributed machines. It will use the available CPU cores for processing. **This is suitable for testing or small-scale data processing on Google Colab.**

- .appName('sparkcolab'): The .appName() method assigns a name to your Spark application.
In this case, the application is named 'sparkcolab'. **This name is primarily for identification in logs or Spark's web UI (which can be useful when running multiple applications).**
It helps distinguish different Spark jobs and makes tracking easier.

- .getOrCreate(): **This method either retrieves an existing Spark session if one is already running or creates a new one if no session exists.**
<br>*This ensures that only one Spark session runs in your environment at a time.*<br>
What the script does:


This script sets up a Spark session in Google Colab, configuring it to run locally on the machine with the name 'sparkcolab'. The session created by this script will allow you to work with Spark's distributed computing capabilities, enabling tasks such as data loading, transformation, and analysis using Spark DataFrames or SQL-like operations.

**Summary of the Arguments:**
master('local'): Runs Spark in "local" mode, which processes data on the current machine.
appName('sparkcolab'): Assigns the name 'sparkcolab' to the Spark application.
getOrCreate(): Retrieves or creates a Spark session to interact with Spark's APIs.
This script is essential for initializing Spark in a Colab environment to use PySpark for data processing tasks.


## Passo 3: Dataset

In [21]:
# Importar o Dataset
# Abrir um input para carregar os arquivos do ambiente local
from google.colab import files
arquivo = files.upload()

Saving title.ratings.tsv to title.ratings (1).tsv
Saving title.basics.tsv to title.basics.tsv


DSV (Delimiter separated Values) como .csv ou .tsv (este caso), são suportados por vários módulos python, como o PySpark e o Pandas.

Neste caso, bastou passar como argumento o separador (tab) para ele fazer a leitura corretamento do Spark Dataframe mais abaixo:

```
 sep=r'\t'
```


In [20]:
type(arquivo)

dict

In [22]:
df = spark.read.csv('title.basics.tsv', header=True, sep=r'\t', inferSchema=True)

In [23]:
# Checar o schema se está tudo certo e tipos de dados
df.printSchema()

root
 |-- tconst: string (nullable = true)
 |-- titleType: string (nullable = true)
 |-- primaryTitle: string (nullable = true)
 |-- originalTitle: string (nullable = true)
 |-- isAdult: string (nullable = true)
 |-- startYear: string (nullable = true)
 |-- endYear: string (nullable = true)
 |-- runtimeMinutes: string (nullable = true)
 |-- genres: string (nullable = true)



In [26]:
# Checar o df como foi montada
df.show(10)

+---------+---------+--------------------+--------------------+-------+---------+-------+--------------+--------------------+
|   tconst|titleType|        primaryTitle|       originalTitle|isAdult|startYear|endYear|runtimeMinutes|              genres|
+---------+---------+--------------------+--------------------+-------+---------+-------+--------------+--------------------+
|tt0000001|    short|          Carmencita|          Carmencita|      0|     1894|     \N|             1|   Documentary,Short|
|tt0000002|    short|Le clown et ses c...|Le clown et ses c...|      0|     1892|     \N|             5|     Animation,Short|
|tt0000003|    short|      Pauvre Pierrot|      Pauvre Pierrot|      0|     1892|     \N|             5|Animation,Comedy,...|
|tt0000004|    short|         Un bon bock|         Un bon bock|      0|     1892|     \N|            12|     Animation,Short|
|tt0000005|    short|    Blacksmith Scene|    Blacksmith Scene|      0|     1893|     \N|             1|        Comedy

In [25]:
# Criar df dos reviews
df_reviews = spark.read.csv('title.ratings.tsv', header=True, sep=r'\t', inferSchema=True)
df_reviews.show(10)

+---------+-------------+--------+
|   tconst|averageRating|numVotes|
+---------+-------------+--------+
|tt0000001|          5.7|    2090|
|tt0000002|          5.6|     283|
|tt0000003|          6.5|    2094|
|tt0000004|          5.4|     184|
|tt0000005|          6.2|    2828|
|tt0000006|          5.0|     196|
|tt0000007|          5.4|     889|
|tt0000008|          5.4|    2233|
|tt0000009|          5.4|     214|
|tt0000010|          6.8|    7699|
+---------+-------------+--------+
only showing top 10 rows



In [33]:
df_reviews.printSchema()

root
 |-- tconst: string (nullable = true)
 |-- averageRating: double (nullable = true)
 |-- numVotes: integer (nullable = true)



## Passo 4: Explorando o dataset/engenharia de atributos

In [30]:
# Selecionar colunas
df_movie = df.select('primaryTitle', 'titleType', 'startYear','genres')

In [31]:
# Visualizar colunas
df_movie.show(10)

+--------------------+---------+---------+--------------------+
|        primaryTitle|titleType|startYear|              genres|
+--------------------+---------+---------+--------------------+
|          Carmencita|    short|     1894|   Documentary,Short|
|Le clown et ses c...|    short|     1892|     Animation,Short|
|      Pauvre Pierrot|    short|     1892|Animation,Comedy,...|
|         Un bon bock|    short|     1892|     Animation,Short|
|    Blacksmith Scene|    short|     1893|        Comedy,Short|
|   Chinese Opium Den|    short|     1894|               Short|
|Corbett and Court...|    short|     1894|         Short,Sport|
|Edison Kinetoscop...|    short|     1894|   Documentary,Short|
|          Miss Jerry|    movie|     1894|             Romance|
| Leaving the Factory|    short|     1895|   Documentary,Short|
+--------------------+---------+---------+--------------------+
only showing top 10 rows



In [34]:
# Criando um novo campo
# O campo 'startYear' está como string
df_year = df_movie.withColumn('year', df_movie['startYear'].cast('int')).drop('startYear')
df_year.show(10)

+--------------------+---------+--------------------+----+
|        primaryTitle|titleType|              genres|year|
+--------------------+---------+--------------------+----+
|          Carmencita|    short|   Documentary,Short|1894|
|Le clown et ses c...|    short|     Animation,Short|1892|
|      Pauvre Pierrot|    short|Animation,Comedy,...|1892|
|         Un bon bock|    short|     Animation,Short|1892|
|    Blacksmith Scene|    short|        Comedy,Short|1893|
|   Chinese Opium Den|    short|               Short|1894|
|Corbett and Court...|    short|         Short,Sport|1894|
|Edison Kinetoscop...|    short|   Documentary,Short|1894|
|          Miss Jerry|    movie|             Romance|1894|
| Leaving the Factory|    short|   Documentary,Short|1895|
+--------------------+---------+--------------------+----+
only showing top 10 rows



In [35]:
df_year.printSchema()

root
 |-- primaryTitle: string (nullable = true)
 |-- titleType: string (nullable = true)
 |-- genres: string (nullable = true)
 |-- year: integer (nullable = true)



In [37]:
# Filtrando pela nova coluna, mas sem armazenar em um novo objeto
df_year.filter(df_year.year > 2020).show()

+--------------------+---------+--------------------+----+
|        primaryTitle|titleType|              genres|year|
+--------------------+---------+--------------------+----+
|Istoriya grazhdan...|    movie|         Documentary|2021|
|            Aufsätze|    short|               Short|2021|
|Number 14: Late S...|    short|               Short|2023|
|   Socialist Realism|    movie|               Drama|2023|
|       Anything Goes|tvEpisode|  Comedy,Drama,Music|2022|
|Histórias de Comb...|    movie|         Documentary|2022|
|      Loading Ludwig|    movie|                  \N|2022|
|Beach Birds for C...|    short|               Short|2024|
|  Neues in Wittstock|    movie|         Documentary|2021|
|       Fado Lusitano|    short|Animation,History...|2023|
|The Surgeon of th...|    movie|     Biography,Drama|2022|
|        Bratrovrazda|    short|               Short|2024|
|           Nine Ball|    movie|                  \N|2023|
|       Truth or Dare|    short|  Comedy,Drama,Short|202

In [38]:
# Contar quantos gêmeros/grupo de gêneros mais apareceram nos filmes
from pyspark.sql.functions import count,col,asc,desc
df_sum = df_year.groupBy('genres').count()
df_sum.show(truncate=False)

+---------------------------+-----+
|genres                     |count|
+---------------------------+-----+
|Comedy,Sport               |3697 |
|Action,War,Western         |1    |
|Action,Adventure,Fantasy   |7436 |
|Documentary,Drama,Fantasy  |168  |
|Adult,Comedy,Musical       |11   |
|Crime,Horror,Short         |405  |
|Adult,Horror,Sci-Fi        |40   |
|Documentary,News,Reality-TV|608  |
|Fantasy,Horror,Musical     |16   |
|Action,Adult,Short         |5    |
|Adult,Game-Show            |3    |
|Animation,Sci-Fi,War       |62   |
|Music,Musical,Short        |162  |
|Adult,Comedy,Talk-Show     |3    |
|Animation,Sport,Thriller   |53   |
|Documentary,Western        |365  |
|Adventure,Family,Fantasy   |5317 |
|Comedy,Drama,Western       |245  |
|Game-Show,Reality-TV       |52122|
|Game-Show,Music,Mystery    |277  |
+---------------------------+-----+
only showing top 20 rows



In [39]:
# Fazer o mesmo cálculo, mas ordenando de forma descendente
df_sum.orderBy(col('count').desc()).show()

+-----------------+-------+
|           genres|  count|
+-----------------+-------+
|            Drama|1254900|
|           Comedy| 734612|
|        Talk-Show| 685713|
|             News| 576649|
|      Documentary| 532383|
|    Drama,Romance| 515088|
|               \N| 495816|
|       Reality-TV| 353243|
|            Adult| 311806|
|   News,Talk-Show| 248358|
|            Short| 216810|
|      Drama,Short| 208419|
|           Family| 189750|
|        Game-Show| 185528|
|     Comedy,Short| 159514|
|Documentary,Short| 154666|
|            Sport| 132552|
|          Romance| 116001|
| Comedy,Talk-Show| 113480|
|            Music| 108621|
+-----------------+-------+
only showing top 20 rows



In [40]:
# Teste do 'Is In'
df_year.select('primaryTitle','year','genres').filter(df_year.genres.isin('Comedy')).show()

+--------------------+----+------+
|        primaryTitle|year|genres|
+--------------------+----+------+
|          Salome Mad|1909|Comedy|
|Jarní sen starého...|1913|Comedy|
|     El bello Arturo|1913|Comedy|
|   Le dernier pardon|1913|Comedy|
|Fulano de Tal se ...|1916|Comedy|
|My Husband's Gett...|1913|Comedy|
|Pommy Arrives in ...|1913|Comedy|
|Battle of Gettysgoat|1914|Comedy|
|   Det blaa vidunder|1915|Comedy|
| Brewster's Millions|1914|Comedy|
|              C.O.D.|1914|Comedy|
|   The Country Mouse|1914|Comedy|
|The Education of ...|1914|Comedy|
|            Engelein|1914|Comedy|
|The Fates and Flo...|1914|Comedy|
|The Perfect Thirt...|1914|Comedy|
|A Florida Enchant...|1914|Comedy|
|  The Fortune Hunter|1914|Comedy|
|      Hombre o mujer|1914|Comedy|
|I kammerherrens k...|1914|Comedy|
+--------------------+----+------+
only showing top 20 rows



In [41]:
# Teste do 'Not In'
# basta colocar um '~' logo no começo dos argumentos do 'filter'
# Teste do 'Is In'
df_year.select('primaryTitle','year','genres').filter(~df_year.genres.isin('Comedy')).show()

+--------------------+----+--------------------+
|        primaryTitle|year|              genres|
+--------------------+----+--------------------+
|          Carmencita|1894|   Documentary,Short|
|Le clown et ses c...|1892|     Animation,Short|
|      Pauvre Pierrot|1892|Animation,Comedy,...|
|         Un bon bock|1892|     Animation,Short|
|    Blacksmith Scene|1893|        Comedy,Short|
|   Chinese Opium Den|1894|               Short|
|Corbett and Court...|1894|         Short,Sport|
|Edison Kinetoscop...|1894|   Documentary,Short|
|          Miss Jerry|1894|             Romance|
| Leaving the Factory|1895|   Documentary,Short|
|Akrobatisches Pot...|1895|   Documentary,Short|
|The Arrival of a ...|1896|   Documentary,Short|
|The Photographica...|1895|   Documentary,Short|
| The Waterer Watered|1895|        Comedy,Short|
| Autour d'une cabine|1894|     Animation,Short|
|Boat Leaving the ...|1895|   Documentary,Short|
|Italienischer Bau...|1895|   Documentary,Short|
|Das boxende Känguru

In [46]:
# Usando SQL dentro do PySpark
# Lembre da importância de usar VIEWS (Datacamp)
df_year.createOrReplaceTempView('movies')
spark.sql('SELECT year, COUNT(*) AS qtd FROM movies GROUP BY year ORDER BY qtd desc').show(truncate=False)

+----+-------+
|year|qtd    |
+----+-------+
|null|1415326|
|2021|495764 |
|2022|474870 |
|2018|451505 |
|2019|446693 |
|2017|445271 |
|2023|432573 |
|2020|427347 |
|2016|421320 |
|2015|396920 |
|2014|376866 |
|2013|355610 |
|2012|331513 |
|2011|292588 |
|2010|260439 |
|2024|247413 |
|2009|227869 |
|2008|216505 |
|2007|200764 |
|2006|179814 |
+----+-------+
only showing top 20 rows



Fazendo JOINS e usando as duas tabelas

In [47]:
# join
# Aqui precisa ajustar conforme nosso dataset atualizado
df.join(df_reviews, df.tconst == df_reviews.tconst, 'inner')\
    .select(df.primaryTitle, df.startYear, df.genres, df.titleType, df_reviews.averageRating, df_reviews.numVotes)\
    .show(truncate=False)

+--------------------------------------+---------+-------------------+---------+-------------+--------+
|primaryTitle                          |startYear|genres             |titleType|averageRating|numVotes|
+--------------------------------------+---------+-------------------+---------+-------------+--------+
|Le clown et ses chiens                |1892     |Animation,Short    |short    |5.6          |283     |
|Edison Kinetoscopic Record of a Sneeze|1894     |Documentary,Short  |short    |5.4          |2233    |
|Autour d'une cabine                   |1894     |Animation,Short    |short    |6.1          |1216    |
|Italienischer Bauerntanz              |1895     |Documentary,Short  |short    |4.6          |355     |
|The Clown Barber                      |1898     |Comedy,Short       |short    |5.1          |32      |
|The Bohemian Encampment               |1896     |Documentary,Short  |short    |3.6          |36      |
|Cortège de tzar au Bois de Boulogne   |1896     |Documentary,Sh

In [48]:
df_complete = df.join(df_reviews, df.tconst == df_reviews.tconst, 'inner')\
    .select(df.primaryTitle, df.startYear, df.genres, df.titleType, df_reviews.averageRating, df_reviews.numVotes)

In [49]:
# Máximo sem salvar em variável
# Qual o filme com mais votos
from pyspark.sql.functions import max
df_complete.agg(max('numVotes').alias('max_votes')).show()

+---------+
|max_votes|
+---------+
|  2942823|
+---------+



In [52]:
# Usando o resultado acima, podemos filtrar o filme com mais votos:
df_max_votos = df_complete.filter(df_complete.numVotes == 2942823)
df_max_votos.show()

+--------------------+---------+------+---------+-------------+--------+
|        primaryTitle|startYear|genres|titleType|averageRating|numVotes|
+--------------------+---------+------+---------+-------------+--------+
|The Shawshank Red...|     1994| Drama|    movie|          9.3| 2942823|
+--------------------+---------+------+---------+-------------+--------+



Ao final inserir alguns testes de processamento com o
```
%%timeit
```

