# Init

In [137]:
import os
import sys
from pyspark.sql import SparkSession
from typing import NamedTuple
from datetime import datetime

os.environ['PYSPARK_PYTHON'] = sys.executable
os.environ['PYSPARK_DRIVER_PYTHON'] = sys.executable
spark = SparkSession.builder.getOrCreate()

In [138]:
from pyspark import SparkContext, SparkConf
app_name = "Lab1"
spark.stop()
conf = SparkConf().setAppName(app_name).setMaster('local[1]')
sc = SparkContext(conf=conf)
sc

In [139]:
def initStation(stations):
    class Station(NamedTuple):
        station_id: int
        name: str
        lat: float
        long: float
        dockcount: int
        landmark: str
        installation: datetime

    for station in stations:
        yield Station(
            station_id=int(station[0]),
            name=station[1],
            lat=float(station[2]),
            long=float(station[3]),
            dockcount=int(station[4]),
            landmark=station[5],
            installation=datetime.strptime(station[6], '%m/%d/%Y')
        )


def initTrip(trips):
    class Trip(NamedTuple):
        trip_id: int
        duration: int
        start_date: datetime
        start_station_name: str
        start_station_id: int
        end_date: datetime
        end_station_name: str
        end_station_id: int
        bike_id: int
        subscription_type: str
        zip_code: str

    for trip in trips:
        yield Trip(
            trip_id=int(trip[0]) if trip[0] else None,
            duration=int(trip[1]) if trip[1] else None,
            start_date=datetime.strptime(trip[2], '%m/%d/%Y %H:%M') if trip[2] else None,
            start_station_name=trip[3],
            start_station_id=int(trip[4]),
            end_date=datetime.strptime(trip[5], '%m/%d/%Y %H:%M') if trip[5] else None,
            end_station_name=trip[6],
            end_station_id=trip[7],
            bike_id=int(trip[8]),
            subscription_type=trip[9],
            zip_code=trip[10]
        )

In [145]:
tripData = sc.textFile("data/trips.csv")
stationData = sc.textFile("data/stations.csv")

header = stationData.first()
station_data_filter = stationData.filter(lambda line: line != header)
station_data_map = station_data_filter.map(lambda line: line.split(','))

print(station_data_map.first())

stations_mapped = station_data_map.mapPartitions(initStation)

stations_mapped.first()

['2', 'San Jose Diridon Caltrain Station', '37.329732', '-121.90178200000001', '27', 'San Jose', '8/6/2013']


Station(station_id=2, name='San Jose Diridon Caltrain Station', lat=37.329732, long=-121.90178200000001, dockcount=27, landmark='San Jose', installation=datetime.datetime(2013, 8, 6, 0, 0))

In [146]:
header = tripData.first()
trips_data_filter = tripData.filter(lambda line: line != header)
trips_data_map = trips_data_filter.map(lambda line: line.split(','))
trips_mapped = trips_data_map.mapPartitions(initTrip)

trips_mapped.first()

Trip(trip_id=4576, duration=63, start_date=datetime.datetime(2013, 8, 29, 14, 42), start_station_name='South Van Ness at Market', start_station_id=66, end_date=datetime.datetime(2013, 8, 29, 14, 14), end_station_name='South Van Ness at Market', end_station_id='66', bike_id=520, subscription_type='Subscriber', zip_code='94127')

# Найти велосипед с максимальным временем пробега.

In [142]:
# Использование reduce для нахождения поездки с максимальной длительностью
max_duration_trip = trips_mapped.reduce(lambda trip1, trip2: trip1 if trip1.duration > trip2.duration else trip2)

max_duration_trip.bike_id

535

# Найти наибольшее геодезическое расстояние между станциями.

In [143]:
from math import radians, cos, sin, asin, sqrt

def haversine(lon1, lat1, lon2, lat2):
    """
    Вычислить геодезическое расстояние между двумя точками на земле, заданными в градусах.
    """
    # Конвертировать десятичные градусы в радианы 
    lon1, lat1, lon2, lat2 = map(radians, [lon1, lat1, lon2, lat2])

    # Формула гаверсинуса
    dlon = lon2 - lon1 
    dlat = lat2 - lat1 
    a = sin(dlat/2)**2 + cos(lat1) * cos(lat2) * sin(dlon/2)**2
    c = 2 * asin(sqrt(a)) 
    r = 6371 # Радиус Земли в километрах. Используйте 3956 для миль
    return c * r

In [148]:
stations_cartesian = stations_mapped.cartesian(stations_mapped)

def calculate_distance(record):
    station1, station2 = record
    distance = haversine(station1.long, station1.lat, station2.long, station2.lat)
    return (station1.station_id, station2.station_id, distance)

distances = stations_cartesian.map(calculate_distance)

max_distance = distances.max(key=lambda x: x[2])
print(f"Максимальное расстояние между станциями {max_distance[0]} и {max_distance[1]}: {max_distance[2]} км")

Максимальное расстояние между станциями 16 и 60: 69.92087595428183 км


# Найти путь велосипеда с максимальным временем пробега через станции.

In [150]:
# Суммирование длительности поездок для каждого велосипеда и нахождение максимума
max_duration_bike_id, _ = trips_mapped \
    .map(lambda trip: (trip.bike_id, trip.duration)) \
    .reduceByKey(lambda a, b: a + b) \
    .max(key=lambda x: x[1])

# Фильтруем поездки для велосипеда с максимальной длительностью и сортируем по дате начала
max_bike_trips_sorted = trips_mapped \
    .filter(lambda trip: trip.bike_id == max_duration_bike_id) \
    .sortBy(lambda trip: trip.start_date)

# Выводим первую поездку для демонстрации
first_trip = max_bike_trips_sorted.first()
print(f"Bike {first_trip.bike_id} started its longest path from {first_trip.start_station_name} on {first_trip.start_date} to {first_trip.end_station_name}")

Bike 535 started its longest path from Post at Kearney on 2013-08-29 19:32:00 to San Francisco Caltrain (Townsend at 4th)


# Найти количество велосипедов в системе.

In [151]:
# Извлекаем bike_id из каждой поездки
bike_ids = trips_mapped.map(lambda trip: trip.bike_id)

# Получаем уникальные bike_id
unique_bike_ids = bike_ids.distinct()

# Подсчитываем количество уникальных велосипедов
bike_count = unique_bike_ids.count()

print(f"Total number of bikes in the system: {bike_count}")

Total number of bikes in the system: 700


# Найти пользователей потративших на поездки более 3 часов.

In [153]:
long_trips = trips_mapped.filter(lambda x: x.duration > (3 * 60 * 60))

unique_zip_codes = (
    long_trips
    .map(lambda trip: trip.zip_code)  
    .filter(lambda zip_code: zip_code != "")  
    .distinct()  
)

first_10_users = unique_zip_codes.take(10)

first_10_users

['58553',
 '94301',
 '94039',
 '94133',
 '93726',
 '94123',
 '4517',
 '29200',
 '45322',
 '94080']