<a href="https://colab.research.google.com/github/sergioaugusto94/Netflix_vs_Amazon_Analysis_using_Spark/blob/main/Netflix_vs_Amazon_Spark.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

Uma empresa fictícia de streaming de video está com o desafio de fazer estudos de mercado para formular a estratégia de negócio que irão adotar.
Para uma parte do estudo de mercado a empresa precisa fazer análises em cima filmes e séries que estão disponíveis na Amazon e na Netflix, duas de suas concorrentes diretas, respondendo a várias perguntas de negócio relacionadas aos
dados da Amazon e da Netflix, que estão listadas abaixo:

- Quantos filmes estão disponíveis na Amazon?
- Quantos filmes estão disponíveis na Netflix?
- Dos filmes disponíveis na Amazon, quantos % estão disponíveis na Netflix?
- Qual ano de lançamento possui mais filmes na Netflix?
- Quais filmes que não estão disponíveis no catálogo da Netflix foram melhor avaliados (notas 4 e 5)?
- Quais filmes que não estão disponíveis no catálogo da Netflix foram melhor avaliados (notas 4 e 5)?


No link abaixo, são encontradas as bases de dados necessárias para a execução desse trabalho:

● Netflix: https://www.kaggle.com/netflix-inc/netflix-prize-data

● Amazon: https://s3.amazonaws.com/amazon-reviews-pds/readme.html

In [None]:
!apt-get install openjdk-8-jdk-headless -qq > /dev/null
!wget -q https://www-us.apache.org/dist/spark/spark-3.1.2/spark-3.1.2-bin-hadoop2.7.tgz
!tar xf spark-3.1.2-bin-hadoop2.7.tgz
!pip install -q findspark
import os
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-8-openjdk-amd64"
os.environ["SPARK_HOME"] = "/content/spark-3.1.2-bin-hadoop2.7"
import findspark
findspark.init()


Criando a seção Spark

In [None]:
from pyspark.sql import SparkSession
from pyspark import SparkContext

sc = SparkContext('local', 'First_App')
spark = SparkSession(sc)

Carregando os arquivos contendo as avaliações dos usuários para cada código de filme, usando o separador de linhas \n

In [None]:
df1 = spark.read.csv('/content/combined_data_1.txt', sep = '\n', ignoreTrailingWhiteSpace = True)
df1 = df1.select('_c0')

df2 = spark.read.csv('/content/combined_data_2.txt', sep = '\n', ignoreTrailingWhiteSpace = True)
df2 = df2.select('_c0')

df3 = spark.read.csv('/content/combined_data_3.txt', sep = '\n', ignoreTrailingWhiteSpace = True)
df3 = df3.select('_c0')

df4 = spark.read.csv('/content/combined_data_4.txt', sep = '\n', ignoreTrailingWhiteSpace = True)
df4 = df4.select('_c0')

Unido os Dataframes em um único df

In [None]:
df = df1.union(df2);
df = df.union(df3);
df = df.union(df4);

df.show(5)

+--------------------+
|                 _c0|
+--------------------+
|                  1:|
|1488844,3,2005-09-06|
| 822109,5,2005-05-13|
| 885013,4,2005-10-19|
|  30878,4,2005-12-26|
+--------------------+
only showing top 5 rows



Separando os dados em colunas. Detalhe que quando temos o código do filme, as colunas 'Rating' e 'Date' ficam com valores nulos e a coluna CLient_Id contém o código do filme juntamente com :

In [None]:
from pyspark.sql.functions import split

split_col =  split(df._c0, '\\,',)

df = df.withColumn("Client_Id", split_col.getItem(0))\
    .withColumn("Rating", split_col.getItem(1))\
    .withColumn("Date", split_col.getItem(2))\
    .drop("_c0")
  
df.show(5)

+---------+------+----------+
|Client_Id|Rating|      Date|
+---------+------+----------+
|       1:|  null|      null|
|  1488844|     3|2005-09-06|
|   822109|     5|2005-05-13|
|   885013|     4|2005-10-19|
|    30878|     4|2005-12-26|
+---------+------+----------+
only showing top 5 rows



Cria uma nova coluna 'filme' com os códigos dos filmes. As linhas de código identificam como sendo o Id de filme pelo fato de logo depois do Id haver o caracter ':'

A última linha preenche com Null as linhas na qual não contém o Id do filme.

In [None]:
import pyspark.sql.functions as f

df = df.select('Client_Id', 'Rating', 'Date', 
               f.regexp_extract("Client_Id", pattern="^[0-9]+(?=:)", idx=0).alias('filme'))

df = df.withColumn('filme', f.when(f.col('filme') == '', None).otherwise(f.col('filme')))

df.show(5)


+---------+------+----------+-----+
|Client_Id|Rating|      Date|filme|
+---------+------+----------+-----+
|       1:|  null|      null|    1|
|  1488844|     3|2005-09-06| null|
|   822109|     5|2005-05-13| null|
|   885013|     4|2005-10-19| null|
|    30878|     4|2005-12-26| null|
+---------+------+----------+-----+
only showing top 5 rows



Cria uma coluna com valores enumerados a partir de 0, que será utilizada em uma próxima etapa do tratamento dos dados. 

In [None]:
df = df.withColumn("id", f.monotonically_increasing_id())

df.show(5)

+---------+------+----------+-----+---+
|Client_Id|Rating|      Date|filme| id|
+---------+------+----------+-----+---+
|       1:|  null|      null|    1|  0|
|  1488844|     3|2005-09-06| null|  1|
|   822109|     5|2005-05-13| null|  2|
|   885013|     4|2005-10-19| null|  3|
|    30878|     4|2005-12-26| null|  4|
+---------+------+----------+-----+---+
only showing top 5 rows



Nessa linha, a coluna com os códigos dos filmes é criada usando a coluna 'filme' como referência.

In [None]:
#preenche a coluna com os id de filme
from pyspark.sql import Window

df = df.withColumn('Movie_Id',f.when(df.filme>0,df.filme).otherwise(f.last(df.filme,ignorenulls=True).over(Window.orderBy("id"))))

df.show(5)

+---------+------+----------+-----+---+--------+
|Client_Id|Rating|      Date|filme| id|Movie_Id|
+---------+------+----------+-----+---+--------+
|       1:|  null|      null|    1|  0|       1|
|  1488844|     3|2005-09-06| null|  1|       1|
|   822109|     5|2005-05-13| null|  2|       1|
|   885013|     4|2005-10-19| null|  3|       1|
|    30878|     4|2005-12-26| null|  4|       1|
+---------+------+----------+-----+---+--------+
only showing top 5 rows



Aqui removemos as linhas que possuem valores nulos.

In [None]:
df = df.where(f.col("Rating").isNotNull())

df.show(5)

+---------+------+----------+-----+---+--------+
|Client_Id|Rating|      Date|filme| id|Movie_Id|
+---------+------+----------+-----+---+--------+
|  1488844|     3|2005-09-06| null|  1|       1|
|   822109|     5|2005-05-13| null|  2|       1|
|   885013|     4|2005-10-19| null|  3|       1|
|    30878|     4|2005-12-26| null|  4|       1|
|   823519|     3|2004-05-03| null|  5|       1|
+---------+------+----------+-----+---+--------+
only showing top 5 rows



E por último selecionamos apenas as colunas de interesse, finalizando o tratamento desses arquivos.

In [None]:
df = df.select('Movie_Id','Rating')

df.show(5)

+--------+------+
|Movie_Id|Rating|
+--------+------+
|       1|     3|
|       1|     5|
|       1|     4|
|       1|     4|
|       1|     3|
+--------+------+
only showing top 5 rows



Agora carregamos os dados da Amazon, usando o separado por tabulação.

In [None]:
amazon = spark.read.csv('/content/amazon_reviews_us_Digital_Video_Download_v1_00.tsv', sep=r'\t', inferSchema=True, header=True)


E aqui, carregamos os nome dos filmes da Netflix, não sendo necessário nenhum tratamento adicional. 

In [None]:
netflix_filmes = spark.read.csv('/content/movie_titles.csv', sep=r',', inferSchema=True)
netflix_filmes = netflix_filmes.withColumnRenamed('_c1','Ano')
netflix_filmes = netflix_filmes.withColumnRenamed('_c2','Filme')

In [None]:
from pyspark.sql.functions import concat_ws,col

netflix_filmes = netflix_filmes.select(concat_ws('_',netflix_filmes.Filme,netflix_filmes.Ano)
              .alias("Filme+data"),"Ano","Filme", '_c0')

In [None]:
netflix_filmes.show(5, truncate=False)

+---------------------------------+----+----------------------------+---+
|Filme+data                       |Ano |Filme                       |_c0|
+---------------------------------+----+----------------------------+---+
|Dinosaur Planet_2003             |2003|Dinosaur Planet             |1  |
|Isle of Man TT 2004 Review_2004  |2004|Isle of Man TT 2004 Review  |2  |
|Character_1997                   |1997|Character                   |3  |
|Paula Abdul's Get Up & Dance_1994|1994|Paula Abdul's Get Up & Dance|4  |
|The Rise and Fall of ECW_2004    |2004|The Rise and Fall of ECW    |5  |
+---------------------------------+----+----------------------------+---+
only showing top 5 rows



Quantos Filmes estão disponíveis na Amazon? Contaram-se apenas os valores não duplicados.

In [None]:
from pyspark.sql.functions import countDistinct

n_amazon = amazon.agg(countDistinct('product_title')).collect()[0].asDict()['count(product_title)']
print('Filmes Amazon: ', n_amazon)

Filmes Amazon:  42399


Quantos Filmes estão disponíveis na Netflix?

- Foi usada a coluna 'Filmes+data' pois alguns filmes possuiam mesmo nome, porem o ano de lançamento era diferente.

In [None]:
n_netflix = netflix_filmes.agg(countDistinct('Filme+data')).collect()[0].asDict()['count(Filme+data)']
print('Filmes Netflix: ', n_netflix)

Filmes Netflix:  17754


Dos filmes disponíveis na Amazon, quantos % estão disponíveis na Netflix?

In [None]:
join_df = amazon.join(netflix_filmes, amazon.product_title == netflix_filmes.Filme, 'inner')

n_amz_net = join_df.agg(countDistinct('Filme')).collect()[0].asDict()['count(Filme)']

print('Percentual de Filmes disponíveis na Amazon que estão também disponíveis na Netflix: ', '%.2f' % (n_amz_net/n_amazon*100))

Percentual de Filmes disponíveis na Amazon que estão também disponíveis na Netflix:  10.05


Qual ano de lançamento possui mais filmes na Netflix?

In [None]:
netflix_filmes_2 = netflix_filmes.dropDuplicates(['Filme+data'])


In [None]:
netflix_filmes_2.createOrReplaceTempView('netflix')
netflix_filmes_2 = spark.sql('SELECT ANO, COUNT(*) FROM netflix GROUP BY ANO')


In [None]:
ano_nf = netflix_filmes_2.orderBy(netflix_filmes_2['count(1)'].desc()).collect()[0].asDict()
ano_nf['ANO']

print('Ano com mais lançamentos na Netflix: ' + ano_nf['ANO'])

Ano com mais lançamentos na Netflix: 2004


Quais filmes que não estão disponíveis no catálogo da Netflix foram melhor avaliados
(notas 4 e 5)?

In [None]:
amazon_2 = amazon.dropDuplicates(['product_title'])
netflix_filmes_2 = netflix_filmes.dropDuplicates(['Filme'])

join_df2 = amazon_2.join(netflix_filmes_2, amazon_2.product_title == netflix_filmes_2.Filme, 'left')

join_df2 = join_df2.filter(join_df2.Filme.isNull())

join_df2.filter(join_df2['star_rating'] >= 4).select('product_title','star_rating').show(5)


+--------------------+-----------+
|       product_title|star_rating|
+--------------------+-----------+
|Six Feet Under Se...|          5|
|  Dominion, Season 2|          5|
|  The Last Airbender|          5|
|Game of Thrones S...|          5|
|           Cry Havoc|          5|
+--------------------+-----------+
only showing top 5 rows



Quais filmes que não estão disponíveis no catálogo da Netflix foram melhor avaliados
(notas 4 e 5)? 
- No caso foi feita uma união com o df contendo as avaliações dos clientes da Netflix com o df contendo o nome dos filmes disponíveis na Netflix.

In [None]:
join_nf = netflix_filmes.join(df, netflix_filmes._c0 == df.Movie_Id, 'inner')

join_nf.show(5)

+--------------------+----+---------------+---+--------+------+
|          Filme+data| Ano|          Filme|_c0|Movie_Id|Rating|
+--------------------+----+---------------+---+--------+------+
|Dinosaur Planet_2003|2003|Dinosaur Planet|  1|       1|     3|
|Dinosaur Planet_2003|2003|Dinosaur Planet|  1|       1|     5|
|Dinosaur Planet_2003|2003|Dinosaur Planet|  1|       1|     4|
|Dinosaur Planet_2003|2003|Dinosaur Planet|  1|       1|     4|
|Dinosaur Planet_2003|2003|Dinosaur Planet|  1|       1|     3|
+--------------------+----+---------------+---+--------+------+
only showing top 5 rows



In [None]:
mean_rating = join_nf.groupBy('_c0').agg(f.mean('Rating'), f.first('Filme'))
mean_rating.filter(mean_rating['avg(Rating)'] >= 4).select('_c0', 'avg(Rating)', 'first(Filme)').show(5, truncate = False)


+---+-----------------+---------------------------------------------------------------------------+
|_c0|avg(Rating)      |first(Filme)                                                               |
+---+-----------------+---------------------------------------------------------------------------+
|13 |4.552            |Lord of the Rings: The Return of the King: Extended Edition: Bonus Material|
|32 |4.071736785329018|ABC Primetime: Mel Gibson's The Passion of the Christ                      |
|33 |4.168650217706822|Aqua Teen Hunger Force: Vol. 1                                             |
|68 |4.14259927797834 |Invader Zim                                                                |
|76 |4.090385917400136|I Love Lucy: Season 2                                                      |
+---+-----------------+---------------------------------------------------------------------------+
only showing top 5 rows

