![picture](https://drive.google.com/uc?export=view&id=1K_GhwMAZaYPEqm-0ukMBxfmBew7N6GXA)<br>
<small>Rian Lopes</small><br>

Inscreva-se no canal <br>[Data Review](https://www.youtube.com/channel/UCYfY8KRS5nqoFBTuoLV0_jw) <br>
E siga nas redes sociais <br> [Instagram Data Review](https://www.instagram.com/data.review/)
<br>
<br>
<h1>Spark for Data Science and Machine Learning</h1>
<h2>Loading data</h2>


In [None]:
## Montar o Google Driver para utilização como disco
from google.colab import drive
drive.mount('/content/drive')


Mounted at /content/drive


<h2>New York TLC Data</h2>
Os dados de registros de viagens do TLC (Taxi and Limousine Commission) de Nova York são conjuntos de informações detalhadas sobre viagens realizadas por táxis, carros de aplicativo (como Uber e Lyft) e outros serviços de transporte autorizados na cidade.

[New York City Taxi/Uber Data](https://www.nyc.gov/site/tlc/about/tlc-trip-record-data.page)

In [None]:
## Recuperando os dados

import requests

url = "https://d37ci6vzurychx.cloudfront.net/trip-data/yellow_tripdata_2024-01.parquet"
response = requests.get(url, stream=True)
output_path = "/content/spark/yellow_tripdata_2024-01.csv"




In [None]:
!mkdir /content/spark


In [None]:
## Salvando os dados em partições
with open(output_path, "wb") as f:
    for chunk in response.iter_content(chunk_size=8192):
        if chunk:
            f.write(chunk)

In [None]:
# Inicialização do Spark
from pyspark.sql import SparkSession
spark = SparkSession.builder \
    .master("local") \
    .appName("loadingDataSpark") \
    .getOrCreate()

data_path = "/content/spark/yellow_tripdata_2024-01.csv"


In [None]:

# Carrega o arquivo CSV em um DataFrame do Spark
df = spark.read.format("parquet") \
    .option("header", "true") \
    .option("inferSchema", "true") \
    .load(data_path)



In [None]:
# Exibe o esquema do DataFrame e as primeiras linhas
df.printSchema()


root
 |-- VendorID: integer (nullable = true)
 |-- tpep_pickup_datetime: timestamp_ntz (nullable = true)
 |-- tpep_dropoff_datetime: timestamp_ntz (nullable = true)
 |-- passenger_count: long (nullable = true)
 |-- trip_distance: double (nullable = true)
 |-- RatecodeID: long (nullable = true)
 |-- store_and_fwd_flag: string (nullable = true)
 |-- PULocationID: integer (nullable = true)
 |-- DOLocationID: integer (nullable = true)
 |-- payment_type: long (nullable = true)
 |-- fare_amount: double (nullable = true)
 |-- extra: double (nullable = true)
 |-- mta_tax: double (nullable = true)
 |-- tip_amount: double (nullable = true)
 |-- tolls_amount: double (nullable = true)
 |-- improvement_surcharge: double (nullable = true)
 |-- total_amount: double (nullable = true)
 |-- congestion_surcharge: double (nullable = true)
 |-- Airport_fee: double (nullable = true)



In [None]:
df.show()


+--------+--------------------+---------------------+---------------+-------------+----------+------------------+------------+------------+------------+-----------+-----+-------+----------+------------+---------------------+------------+--------------------+-----------+
|VendorID|tpep_pickup_datetime|tpep_dropoff_datetime|passenger_count|trip_distance|RatecodeID|store_and_fwd_flag|PULocationID|DOLocationID|payment_type|fare_amount|extra|mta_tax|tip_amount|tolls_amount|improvement_surcharge|total_amount|congestion_surcharge|Airport_fee|
+--------+--------------------+---------------------+---------------+-------------+----------+------------------+------------+------------+------------+-----------+-----+-------+----------+------------+---------------------+------------+--------------------+-----------+
|       2| 2024-01-01 00:57:55|  2024-01-01 01:17:43|              1|         1.72|         1|                 N|         186|          79|           2|       17.7|  1.0|    0.5|       0.

In [None]:
# Number of rows
num_rows = df.count()

# Number of columns
num_columns = len(df.columns)

print(f"Dimensions: ({num_rows}, {num_columns})")

Dimensions: (2964624, 19)


In [None]:
spark.stop()

In [None]:
import os
import requests

def download_file(url, output_path):
    """Download a file from a URL to a local path."""
    if os.path.exists(output_path):
        print(f"File {output_path} already exists. Skipping download.")
        return
    print(f"Downloading {url} to {output_path}")
    response = requests.get(url, stream=True)
    if response.status_code == 200:
        with open(output_path, "wb") as f:
            for chunk in response.iter_content(chunk_size=8192):
                if chunk:
                    f.write(chunk)
    else:
        print(f"Failed to download {url}. Status code: {response.status_code}")

In [None]:
from pyspark.sql import SparkSession



# Inicializa a sessão Spark
spark = SparkSession.builder \
    .master("local") \
    .appName("loadingDataSpark1") \
    .getOrCreate()



In [None]:
from datetime import datetime
from dateutil.relativedelta import relativedelta

# Defina uma data de referência, por exemplo, o mês mais recente disponível.
# Aqui, usamos 2024-01-01 como referência (janeiro de 2024)
ref_date = datetime(2024, 1, 1)

# Monta a lista de caminhos dos arquivos para os últimos 6 meses
file_paths = []
for i in range(12):
    month_date = ref_date - relativedelta(months=i)

    file_name = f"yellow_tripdata_{month_date.year}-{month_date.month:02d}.parquet"
    file_url = f"https://d37ci6vzurychx.cloudfront.net/trip-data/{file_name}"
    output_path = f"/content/spark/{file_name}"

    # Call the download function
    download_file(file_url, output_path)

    # Add the output path to the list
    file_paths.append(output_path)

print("Arquivos a serem carregados:", file_paths)

Downloading https://d37ci6vzurychx.cloudfront.net/trip-data/yellow_tripdata_2024-01.parquet to /content/spark/yellow_tripdata_2024-01.parquet
Downloading https://d37ci6vzurychx.cloudfront.net/trip-data/yellow_tripdata_2023-12.parquet to /content/spark/yellow_tripdata_2023-12.parquet
Downloading https://d37ci6vzurychx.cloudfront.net/trip-data/yellow_tripdata_2023-11.parquet to /content/spark/yellow_tripdata_2023-11.parquet
Downloading https://d37ci6vzurychx.cloudfront.net/trip-data/yellow_tripdata_2023-10.parquet to /content/spark/yellow_tripdata_2023-10.parquet
Downloading https://d37ci6vzurychx.cloudfront.net/trip-data/yellow_tripdata_2023-09.parquet to /content/spark/yellow_tripdata_2023-09.parquet
Downloading https://d37ci6vzurychx.cloudfront.net/trip-data/yellow_tripdata_2023-08.parquet to /content/spark/yellow_tripdata_2023-08.parquet
Downloading https://d37ci6vzurychx.cloudfront.net/trip-data/yellow_tripdata_2023-07.parquet to /content/spark/yellow_tripdata_2023-07.parquet
Downlo

In [None]:
# Carrega os arquivos usando a lista de caminhos
df = spark.read.format("parquet") \
    .option("header", "true") \
    .option("inferSchema", "true") \
    .load(file_paths)


In [None]:
# Exibe o esquema e as primeiras linhas do DataFrame combinado
df.printSchema()
df.show(5)

root
 |-- VendorID: integer (nullable = true)
 |-- tpep_pickup_datetime: timestamp_ntz (nullable = true)
 |-- tpep_dropoff_datetime: timestamp_ntz (nullable = true)
 |-- passenger_count: long (nullable = true)
 |-- trip_distance: double (nullable = true)
 |-- RatecodeID: long (nullable = true)
 |-- store_and_fwd_flag: string (nullable = true)
 |-- PULocationID: integer (nullable = true)
 |-- DOLocationID: integer (nullable = true)
 |-- payment_type: long (nullable = true)
 |-- fare_amount: double (nullable = true)
 |-- extra: double (nullable = true)
 |-- mta_tax: double (nullable = true)
 |-- tip_amount: double (nullable = true)
 |-- tolls_amount: double (nullable = true)
 |-- improvement_surcharge: double (nullable = true)
 |-- total_amount: double (nullable = true)
 |-- congestion_surcharge: double (nullable = true)
 |-- Airport_fee: double (nullable = true)

+--------+--------------------+---------------------+---------------+-------------+----------+------------------+------------

In [None]:

# Number of rows
num_rows = df.count()

# Number of columns
num_columns = len(df.columns)

print(f"Dimensions: ({num_rows}, {num_columns})")

Dimensions: (38208084, 19)


In [None]:
try:
    pandas_df = df.toPandas()
    print(pandas_df)
except Exception as e:
    print(f"Error during toPandas conversion: {str(e)}")



ERROR:root:Exception while sending command.
Traceback (most recent call last):
  File "/usr/local/lib/python3.11/dist-packages/py4j/clientserver.py", line 516, in send_command
    raise Py4JNetworkError("Answer from Java side is empty")
py4j.protocol.Py4JNetworkError: Answer from Java side is empty

During handling of the above exception, another exception occurred:

Traceback (most recent call last):
  File "/usr/local/lib/python3.11/dist-packages/py4j/java_gateway.py", line 1038, in send_command
    response = connection.send_command(command)
               ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/usr/local/lib/python3.11/dist-packages/py4j/clientserver.py", line 539, in send_command
    raise Py4JNetworkError(
py4j.protocol.Py4JNetworkError: Error while sending or receiving


Error during toPandas conversion: [Errno 111] Connection refused


In [None]:
pandas_df

Unnamed: 0,VendorID,tpep_pickup_datetime,tpep_dropoff_datetime,passenger_count,trip_distance,RatecodeID,store_and_fwd_flag,PULocationID,DOLocationID,payment_type,fare_amount,extra,mta_tax,tip_amount,tolls_amount,improvement_surcharge,total_amount,congestion_surcharge,Airport_fee
0,1,2023-10-01 00:16:44,2023-10-01 00:16:49,1,0.00,1,N,168,168,2,3.0,1.0,0.5,0.00,0.0,1.0,5.50,0.0,0.0
1,1,2023-10-01 00:23:24,2023-10-01 00:23:47,1,0.00,1,N,168,168,2,3.0,1.0,0.5,0.00,0.0,1.0,5.50,0.0,0.0
2,1,2023-10-01 00:21:18,2023-10-01 00:27:31,1,0.90,1,N,161,186,1,6.5,3.5,0.5,2.90,0.0,1.0,14.40,2.5,0.0
3,1,2023-10-01 00:17:39,2023-10-01 00:17:47,0,0.00,1,N,255,255,3,3.0,1.0,0.5,0.00,0.0,1.0,5.50,0.0,0.0
4,2,2023-10-01 00:16:15,2023-10-01 00:22:57,2,1.41,1,N,151,239,1,10.0,1.0,0.5,3.00,0.0,1.0,18.00,2.5,0.0
...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...
9995,1,2023-10-01 01:53:11,2023-10-01 02:14:49,1,4.10,1,N,48,146,1,21.9,3.5,0.5,0.00,0.0,1.0,26.90,2.5,0.0
9996,2,2023-10-01 01:38:08,2023-10-01 01:43:07,1,0.97,1,N,148,211,1,7.2,1.0,0.5,2.44,0.0,1.0,14.64,2.5,0.0
9997,2,2023-10-01 01:49:32,2023-10-01 02:02:53,1,1.48,1,N,113,148,1,12.8,1.0,0.5,3.56,0.0,1.0,21.36,2.5,0.0
9998,2,2023-10-01 01:05:17,2023-10-01 01:12:07,2,0.82,1,N,170,170,1,7.9,1.0,0.5,2.58,0.0,1.0,15.48,2.5,0.0
