In [28]:
#pip install requests pandas

In [29]:
#pip install pyspark

In [1]:
import requests
import zipfile
import io

# Atsisiųskite failą
url = "http://web.ais.dk/aisdata/aisdk-2024-05-04.zip"
response = requests.get(url)

# Išpakuokite ZIP failą į atmintį
with zipfile.ZipFile(io.BytesIO(response.content)) as z:
    z.extractall("ais_data")

In [17]:
import pandas as pd
import os

# Raskite išpakuotą CSV failą
csv_file = [f for f in os.listdir("ais_data") if f.endswith('.csv')][0]

# Nuskaitykite CSV failą
df = pd.read_csv(f"ais_data/{csv_file}")

# Rodykite pirmas kelias eilutes
print(df.head(20))

# Išsaugokite CSV failą norimoje vietoje
df.to_csv("C:/bigdata/aisdk-2024-05-04.csv", index=False)

            # Timestamp Type of mobile       MMSI   Latitude  Longitude  \
0   04/05/2024 00:00:00        Class A  230613000  56.856532  10.983877   
1   04/05/2024 00:00:00   Base Station    2190071  57.110042   8.648280   
2   04/05/2024 00:00:00   Base Station    2190068  56.447258  10.945872   
3   04/05/2024 00:00:00        Class A  351381000  57.390600   9.005983   
4   04/05/2024 00:00:00        Class A  219016683  56.800147   9.024962   
5   04/05/2024 00:00:00        Class A  219012563  57.048650  10.052285   
6   04/05/2024 00:00:00        Class A  219030053  57.058312   9.900780   
7   04/05/2024 00:00:00   Base Station    2190064  56.716572  11.519048   
8   04/05/2024 00:00:00        Class A  371519000  55.351183   6.181333   
9   04/05/2024 00:00:00   Base Station    2190064  56.716572  11.519048   
10  04/05/2024 00:00:00   Base Station    2190064  56.716572  11.519048   
11  04/05/2024 00:00:00        Class A  273391650  54.564675  12.291570   
12  04/05/2024 00:00:00  

In [30]:
#pip install geopy

In [27]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, udf, sum as _sum, desc, unix_timestamp
from pyspark.sql.types import StructType, StructField, StringType, DoubleType
from geopy.distance import geodesic

# Sukuriame Spark sesiją
spark = SparkSession.builder \
    .appName("ShipDistanceAnalysis") \
    .getOrCreate()

# Apibrėžiame duomenų schemą
schema = StructType([
    StructField("Timestamp", StringType(), True),
    StructField("Type", StringType(), True),
    StructField("MMSI", StringType(), True),
    StructField("Latitude", DoubleType(), True),
    StructField("Longitude", DoubleType(), True)
])

# Įkeliame duomenis
data = spark.read.csv("C:/bigdata/aisdk-2024-05-04.csv", schema=schema, header=True)

# Konvertuojame Timestamp į unix time
data = data.withColumn("Timestamp", unix_timestamp(col("Timestamp"), "dd/MM/yyyy HH:mm:ss"))

# Filtruojame neteisingas platumos ir ilgumos reikšmes
data = data.filter((col("Latitude") >= -90) & (col("Latitude") <= 90))

# Apibrėžiame UDF funkciją atstumui apskaičiuoti naudojant geodesic formulę
def geodesic_distance(lat1, lon1, lat2, lon2):
    if None in (lat1, lon1, lat2, lon2):
        return 0.0
    return geodesic((lat1, lon1), (lat2, lon2)).kilometers

geodesic_udf = udf(geodesic_distance, DoubleType())

# Sukuriame lango specifikacijas atstumui apskaičiuoti tarp nuoseklių pozicijų
from pyspark.sql.window import Window
from pyspark.sql.functions import lag

windowSpec = Window.partitionBy("MMSI").orderBy("Timestamp")

data = data.withColumn("PrevLatitude", lag("Latitude").over(windowSpec)) \
           .withColumn("PrevLongitude", lag("Longitude").over(windowSpec))

data = data.withColumn("Distance", geodesic_udf(col("Latitude"), col("Longitude"), col("PrevLatitude"), col("PrevLongitude")))

# Agreguojame atstumus pagal MMSI
distance_per_ship = data.groupBy("MMSI").agg(_sum("Distance").alias("TotalDistance"))

# Rikiuojame laivus pagal nuvažiuotą atstumą mažėjimo tvarka
sorted_distances = distance_per_ship.orderBy(desc("TotalDistance"))

# Rodome rezultatus
sorted_distances.show(10, truncate=False)

# Randame laivą, kuris nuplaukė ilgiausią atstumą
max_distance_ship = sorted_distances.first()

# Spausdiname rezultatą
if max_distance_ship:
    print(f"Laivas su MMSI {max_distance_ship['MMSI']} nuplaukė ilgiausią maršrutą: {max_distance_ship['TotalDistance']} km")
else:
    print("No results after distance calculation. Check the data and computations.")

# Sustabdome Spark sesiją
spark.stop()


+---------+------------------+
|MMSI     |TotalDistance     |
+---------+------------------+
|219000962|91785.42917167238 |
|305036000|15695.34132887764 |
|992111851|12008.197585808928|
|255806476|10877.8652682289  |
|266473000|9886.37870008194  |
|218795000|8412.885077629566 |
|230352000|7406.838369166611 |
|218816000|6510.103345306211 |
|266460000|6417.114614086044 |
|218292000|6395.230655031582 |
+---------+------------------+
only showing top 10 rows

Laivas su MMSI 219000962 nuplaukė ilgiausią maršrutą: 91785.42917167238 km
