# Inicialização

In [1]:
import os
import requests
import zipfile

url = 'https://files.grouplens.org/datasets/movielens/ml-latest-small.zip'
arquivo_existe = os.path.exists('./dados/ml-latest-small.zip')

if not arquivo_existe:
    with requests.get(url, allow_redirects=True, verify=False) as resp:

        with open('dados/ml-latest-small.zip', 'wb') as arquivo:
            for chunk in resp.iter_content(chunk_size=128):
                arquivo.write(chunk)

        with zipfile.ZipFile('dados/ml-latest-small.zip') as arquivo_zip:
            arquivo_zip.extractall('dados')

    print("Download concluído")

In [2]:
from pyspark.sql import SparkSession
import pandas as pd
from pyspark.sql import Row

spark = SparkSession.builder.getOrCreate()

DIRETORIO='dados/ml-latest-small/'

# Criando um DataFrame

## Usando um DataFrame do pandas

In [3]:
dados = pd.read_csv(DIRETORIO + 'movies.csv')
dados.head()

Unnamed: 0,movieId,title,genres
0,1,Toy Story (1995),Adventure|Animation|Children|Comedy|Fantasy
1,2,Jumanji (1995),Adventure|Children|Fantasy
2,3,Grumpier Old Men (1995),Comedy|Romance
3,4,Waiting to Exhale (1995),Comedy|Drama|Romance
4,5,Father of the Bride Part II (1995),Comedy


In [4]:
df = spark.createDataFrame(dados)
df.head(5)

[Row(movieId=1, title='Toy Story (1995)', genres='Adventure|Animation|Children|Comedy|Fantasy'),
 Row(movieId=2, title='Jumanji (1995)', genres='Adventure|Children|Fantasy'),
 Row(movieId=3, title='Grumpier Old Men (1995)', genres='Comedy|Romance'),
 Row(movieId=4, title='Waiting to Exhale (1995)', genres='Comedy|Drama|Romance'),
 Row(movieId=5, title='Father of the Bride Part II (1995)', genres='Comedy')]

## Usando uma lista de rows

In [5]:
from pyspark.sql import Row

In [6]:
df_teste = spark.createDataFrame([
    Row(movieId=1, title='Toy Story (1995)', genres='Adventure|Animation|Children|Comedy|Fantasy'),
    Row(movieId=2, title='Jumanji (1995)', genres='Adventure|Children|Fantasy'),
    Row(movieId=3, title='Grumpier Old Men (1995)', genres='Comedy|Romance'),
    Row(movieId=4, title='Waiting to Exhale (1995)', genres='Comedy|Drama|Romance'),
    Row(movieId=5, title='Father of the Bride Part II (1995)', genres='Comedy')
])

In [7]:
df_teste.head(5)

[Row(movieId=1, title='Toy Story (1995)', genres='Adventure|Animation|Children|Comedy|Fantasy'),
 Row(movieId=2, title='Jumanji (1995)', genres='Adventure|Children|Fantasy'),
 Row(movieId=3, title='Grumpier Old Men (1995)', genres='Comedy|Romance'),
 Row(movieId=4, title='Waiting to Exhale (1995)', genres='Comedy|Drama|Romance'),
 Row(movieId=5, title='Father of the Bride Part II (1995)', genres='Comedy')]

## Usando uma lista de rows com um esquema

In [8]:
from pyspark.sql import Row

In [9]:
esquema = 'movieId int, title string, genres string'
df_teste = spark.createDataFrame([
    Row(movieId=1, title='Toy Story (1995)', genres='Adventure|Animation|Children|Comedy|Fantasy'),
    Row(movieId=2, title='Jumanji (1995)', genres='Adventure|Children|Fantasy'),
    Row(movieId=3, title='Grumpier Old Men (1995)', genres='Comedy|Romance'),
    Row(movieId=4, title='Waiting to Exhale (1995)', genres='Comedy|Drama|Romance'),
    Row(movieId=5, title='Father of the Bride Part II (1995)', genres='Comedy')
], schema=esquema)
df_teste.head(5)

[Row(movieId=1, title='Toy Story (1995)', genres='Adventure|Animation|Children|Comedy|Fantasy'),
 Row(movieId=2, title='Jumanji (1995)', genres='Adventure|Children|Fantasy'),
 Row(movieId=3, title='Grumpier Old Men (1995)', genres='Comedy|Romance'),
 Row(movieId=4, title='Waiting to Exhale (1995)', genres='Comedy|Drama|Romance'),
 Row(movieId=5, title='Father of the Bride Part II (1995)', genres='Comedy')]

In [10]:
df_teste.printSchema()

root
 |-- movieId: integer (nullable = true)
 |-- title: string (nullable = true)
 |-- genres: string (nullable = true)



# Visualizando os dados

In [11]:
df.show()

+-------+--------------------+--------------------+
|movieId|               title|              genres|
+-------+--------------------+--------------------+
|      1|    Toy Story (1995)|Adventure|Animati...|
|      2|      Jumanji (1995)|Adventure|Childre...|
|      3|Grumpier Old Men ...|      Comedy|Romance|
|      4|Waiting to Exhale...|Comedy|Drama|Romance|
|      5|Father of the Bri...|              Comedy|
|      6|         Heat (1995)|Action|Crime|Thri...|
|      7|      Sabrina (1995)|      Comedy|Romance|
|      8| Tom and Huck (1995)|  Adventure|Children|
|      9| Sudden Death (1995)|              Action|
|     10|    GoldenEye (1995)|Action|Adventure|...|
|     11|American Presiden...|Comedy|Drama|Romance|
|     12|Dracula: Dead and...|       Comedy|Horror|
|     13|        Balto (1995)|Adventure|Animati...|
|     14|        Nixon (1995)|               Drama|
|     15|Cutthroat Island ...|Action|Adventure|...|
|     16|       Casino (1995)|         Crime|Drama|
|     17|Sen

In [12]:
df.show(2)

+-------+----------------+--------------------+
|movieId|           title|              genres|
+-------+----------------+--------------------+
|      1|Toy Story (1995)|Adventure|Animati...|
|      2|  Jumanji (1995)|Adventure|Childre...|
+-------+----------------+--------------------+
only showing top 2 rows



In [13]:
df.show(n=2, truncate=False, vertical=True)

-RECORD 0----------------------------------------------
 movieId | 1                                           
 title   | Toy Story (1995)                            
 genres  | Adventure|Animation|Children|Comedy|Fantasy 
-RECORD 1----------------------------------------------
 movieId | 2                                           
 title   | Jumanji (1995)                              
 genres  | Adventure|Children|Fantasy                  
only showing top 2 rows



In [14]:
df.columns

['movieId', 'title', 'genres']

In [15]:
df.select('movieId', 'genres').show(2)

+-------+--------------------+
|movieId|              genres|
+-------+--------------------+
|      1|Adventure|Animati...|
|      2|Adventure|Childre...|
+-------+--------------------+
only showing top 2 rows



In [16]:
df.take(1)

[Row(movieId=1, title='Toy Story (1995)', genres='Adventure|Animation|Children|Comedy|Fantasy')]

In [17]:
df.tail(1)

[Row(movieId=193609, title='Andrew Dice Clay: Dice Rules (1991)', genres='Comedy')]

In [18]:
# comentado pois retorna todos os dados
# df.collect()
print('df.collect()')

df.collect()


# Selecionando e acessando dados

In [19]:
df.movieId

Column<'movieId'>

In [20]:
df.select(df.movieId).describe().show()

+-------+------------------+
|summary|           movieId|
+-------+------------------+
|  count|              9742|
|   mean|42200.353623485935|
| stddev| 52160.49485443831|
|    min|                 1|
|    max|            193609|
+-------+------------------+



In [21]:
df.withColumn('title_sub2', df.title.substr(1, 2)).show()

+-------+--------------------+--------------------+----------+
|movieId|               title|              genres|title_sub2|
+-------+--------------------+--------------------+----------+
|      1|    Toy Story (1995)|Adventure|Animati...|        To|
|      2|      Jumanji (1995)|Adventure|Childre...|        Ju|
|      3|Grumpier Old Men ...|      Comedy|Romance|        Gr|
|      4|Waiting to Exhale...|Comedy|Drama|Romance|        Wa|
|      5|Father of the Bri...|              Comedy|        Fa|
|      6|         Heat (1995)|Action|Crime|Thri...|        He|
|      7|      Sabrina (1995)|      Comedy|Romance|        Sa|
|      8| Tom and Huck (1995)|  Adventure|Children|        To|
|      9| Sudden Death (1995)|              Action|        Su|
|     10|    GoldenEye (1995)|Action|Adventure|...|        Go|
|     11|American Presiden...|Comedy|Drama|Romance|        Am|
|     12|Dracula: Dead and...|       Comedy|Horror|        Dr|
|     13|        Balto (1995)|Adventure|Animati...|    

In [22]:
df.filter(df.movieId == 6).show()

+-------+-----------+--------------------+
|movieId|      title|              genres|
+-------+-----------+--------------------+
|      6|Heat (1995)|Action|Crime|Thri...|
+-------+-----------+--------------------+



# Lendo/escrevendo dados

In [23]:
df_csv = spark.read.csv(DIRETORIO + 'movies.csv', header=True)
df_csv.write.parquet(DIRETORIO + 'movies.parquet')

In [24]:
df_parquet = spark.read.parquet(DIRETORIO + 'movies.parquet')
df_parquet.write.csv(DIRETORIO + 'movies_teste.csv', header=True)