In [None]:
#Pobieranie danych

In [18]:
import os
import requests
from zipfile import ZipFile

data_dir = "/home/jovyan/data/citibike/"
os.makedirs(data_dir, exist_ok=True)

files = [
    "2017-citibike-tripdata.zip",
    # "2018-citibike-tripdata.zip",
    # "2019-citibike-tripdata.zip",
]

base_url = "https://s3.amazonaws.com/tripdata/"

for file_name in files:
    zip_path = os.path.join(data_dir, file_name)
    if not os.path.exists(zip_path):
        print(f"Pobieranie {file_name} …")
        r = requests.get(base_url + file_name, stream=True)
        if r.status_code == 200:
            with open(zip_path, "wb") as f:
                for chunk in r.iter_content(8192):
                    f.write(chunk)
            print(f"Pobrano {file_name}")
        else:
            print(f"Błąd pobierania: {r.status_code}")
    else:
        print(f"{file_name} już istnieje, pomijam pobieranie")

    print(f"Rozpakowywanie {file_name} …")
    with ZipFile(zip_path, 'r') as zip_ref:
        zip_ref.extractall(data_dir)
    print(f"Gotowe: {file_name}\n")


Pobieranie 2017-citibike-tripdata.zip …
Pobrano 2017-citibike-tripdata.zip
Rozpakowywanie 2017-citibike-tripdata.zip …
Gotowe: 2017-citibike-tripdata.zip



In [19]:
#delete zip files
import os
import glob

zip_files = glob.glob("/home/jovyan/data/citibike/*.zip")
for f in zip_files:
    os.remove(f)

In [20]:
#delete files other than month library (4_April etc)

import os
import shutil

base_path = "/home/jovyan/data/citibike/"

for year_dir in os.listdir(base_path):
    year_path = os.path.join(base_path, year_dir)
    if os.path.isdir(year_path):
        # iteracja po elementach w katalogu rocznym
        for item in os.listdir(year_path):
            item_path = os.path.join(year_path, item)
            if not os.path.isdir(item_path):
                print(f"Usuwam plik: {item_path}")
                os.remove(item_path)


Usuwam plik: /home/jovyan/data/citibike/2019-citibike-tripdata/.DS_Store
Usuwam plik: /home/jovyan/data/citibike/2017-citibike-tripdata/.DS_Store
Usuwam plik: /home/jovyan/data/citibike/2018-citibike-tripdata/201810-citibike-tripdata.csv
Usuwam plik: /home/jovyan/data/citibike/2018-citibike-tripdata/201806-citibike-tripdata.csv
Usuwam plik: /home/jovyan/data/citibike/2018-citibike-tripdata/201804-citibike-tripdata_2.csv
Usuwam plik: /home/jovyan/data/citibike/2018-citibike-tripdata/201802-citibike-tripdata.csv
Usuwam plik: /home/jovyan/data/citibike/2018-citibike-tripdata/201801-citibike-tripdata.csv
Usuwam plik: /home/jovyan/data/citibike/2018-citibike-tripdata/201809-citibike-tripdata.csv
Usuwam plik: /home/jovyan/data/citibike/2018-citibike-tripdata/201805-citibike-tripdata.csv
Usuwam plik: /home/jovyan/data/citibike/2018-citibike-tripdata/201807-citibike-tripdata.csv
Usuwam plik: /home/jovyan/data/citibike/2018-citibike-tripdata/201803-citibike-tripdata.csv
Usuwam plik: /home/jovya

In [6]:
import os
import pandas as pd

# katalog z danymi Citibike
base_path = "/home/jovyan/data/citibike/"

# katalog, gdzie zapiszemy scalone dane
output_path = "/home/jovyan/data/citibike_merged/"
os.makedirs(output_path, exist_ok=True)

# wzorcowe kolumny
standard_cols = [
    'tripduration', 'starttime', 'stoptime', 'start station id', 'start station name',
    'start station latitude', 'start station longitude', 'end station id', 'end station name',
    'end station latitude', 'end station longitude', 'bikeid', 'usertype', 'birth year', 'gender'
]

# iterujemy po rocznikach
for year_dir in sorted(os.listdir(base_path)):
    year_path = os.path.join(base_path, year_dir)
    if os.path.isdir(year_path):
        print(f"Wczytywanie danych dla: {year_dir}")
        year_dfs = []

        # iteracja po podkatalogach miesięcznych
        for month_dir in sorted(os.listdir(year_path)):
            month_path = os.path.join(year_path, month_dir)
            if os.path.isdir(month_path):
                # wczytujemy wszystkie CSV w podkatalogu miesiąca
                for csv_file in sorted(os.listdir(month_path)):
                    if csv_file.endswith(".csv"):
                        file_path = os.path.join(month_path, csv_file)
                        print(f"  - {file_path}")
                        df = pd.read_csv(file_path)

                        # dla roku 2017 filtrujemy tylko standardowe kolumny
                        if "2017" in year_dir:
                            df = df[[c for c in df.columns if c in standard_cols]]

                        year_dfs.append(df)

        # scalanie wszystkich miesięcy w jeden DataFrame roczny
        if year_dfs:
            year_df = pd.concat(year_dfs, ignore_index=True)
            print(f"Scalone dane dla {year_dir}: {year_df.shape}")

            # zapis do CSV
            csv_out = os.path.join(output_path, f"{year_dir}.csv")
            year_df.to_csv(csv_out, index=False)
            print(f"Zapisano CSV: {csv_out}")

            # zapis do Parquet
            parquet_out = os.path.join(output_path, f"{year_dir}.parquet")
            year_df.to_parquet(parquet_out, index=False)
            print(f"Zapisano Parquet: {parquet_out}")


Wczytywanie danych dla: 2017-citibike-tripdata
  - /home/jovyan/data/citibike/2017-citibike-tripdata/10_October/201710-citibike-tripdata.csv_1.csv
  - /home/jovyan/data/citibike/2017-citibike-tripdata/10_October/201710-citibike-tripdata.csv_2.csv
  - /home/jovyan/data/citibike/2017-citibike-tripdata/11_November/201711-citibike-tripdata.csv_1.csv
  - /home/jovyan/data/citibike/2017-citibike-tripdata/11_November/201711-citibike-tripdata.csv_2.csv
  - /home/jovyan/data/citibike/2017-citibike-tripdata/12_December/201712-citibike-tripdata.csv_1.csv
  - /home/jovyan/data/citibike/2017-citibike-tripdata/1_January/201701-citibike-tripdata.csv_1.csv
  - /home/jovyan/data/citibike/2017-citibike-tripdata/2_February/201702-citibike-tripdata.csv_1.csv
  - /home/jovyan/data/citibike/2017-citibike-tripdata/3_March/201703-citibike-tripdata.csv_1.csv
  - /home/jovyan/data/citibike/2017-citibike-tripdata/4_April/201704-citibike-tripdata.csv_1.csv
  - /home/jovyan/data/citibike/2017-citibike-tripdata/4_A

In [11]:
from pyspark.sql import SparkSession

spark = SparkSession.builder \
    .appName("CitiBike Analysis") \
    .getOrCreate()

# Wczytywanie CSV
df_2017_csv = spark.read.option("header", "true").csv("/home/jovyan/data/citibike_merged/2017-citibike-tripdata.csv")
df_2018_csv = spark.read.option("header", "true").csv("/home/jovyan/data/citibike_merged/2018-citibike-tripdata.csv")
df_2019_csv = spark.read.option("header", "true").csv("/home/jovyan/data/citibike_merged/2019-citibike-tripdata.csv")

# Wczytywanie Parquet
df_2017_parquet = spark.read.parquet("/home/jovyan/data/citibike_merged/2017-citibike-tripdata.parquet")
df_2018_parquet = spark.read.parquet("/home/jovyan/data/citibike_merged/2018-citibike-tripdata.parquet")
df_2019_parquet = spark.read.parquet("/home/jovyan/data/citibike_merged/2019-citibike-tripdata.parquet")


In [13]:
# Lista DataFrame'ów, które masz w Sparku
dfs = {
    "2017": df_2017_csv,
    "2017_p": df_2017_parquet,
    "2018": df_2018_csv,
    "2018_p": df_2018_parquet,
    "2019": df_2019_csv,
    "2019_p": df_2019_parquet,    
}


# Pętla testowa
for year, df in dfs.items():
    print(f"Rok: {year}")
    print(f"Liczba rekordów: {df.count()}")
    print(f"Liczba kolumn: {len(df.columns)}")
    print("Schemat kolumn:")
    df.printSchema()
    print("Przykładowe 5 wierszy:")
    df.show(5)
    print("-"*50)


Rok: 2017
Liczba rekordów: 16364657
Liczba kolumn: 15
Schemat kolumn:
root
 |-- tripduration: string (nullable = true)
 |-- starttime: string (nullable = true)
 |-- stoptime: string (nullable = true)
 |-- start station id: string (nullable = true)
 |-- start station name: string (nullable = true)
 |-- start station latitude: string (nullable = true)
 |-- start station longitude: string (nullable = true)
 |-- end station id: string (nullable = true)
 |-- end station name: string (nullable = true)
 |-- end station latitude: string (nullable = true)
 |-- end station longitude: string (nullable = true)
 |-- bikeid: string (nullable = true)
 |-- usertype: string (nullable = true)
 |-- birth year: string (nullable = true)
 |-- gender: string (nullable = true)

Przykładowe 5 wierszy:
+------------+-------------------+-------------------+----------------+--------------------+----------------------+-----------------------+--------------+--------------------+--------------------+----------------