In [None]:
import findspark
findspark.init()

from pyspark.sql import SparkSession
from pyspark.sql.types import StructType, StructField, StringType, IntegerType

SCIEZKA_DO_STEROWNIKA_JAR = "mysql-connector.jar" 

DB_PASS = "administrator"

DB_USER = "root"
DB_URL = "jdbc:mysql://localhost:3306/grupa_db"
DB_TABLE = "koledzy"


print(">>> Próba uruchomienia sesji Spark ze sterownikiem MySQL...")

try:
    spark = SparkSession.builder \
        .appName("MySQL_Zadanie_Lab") \
        .config("spark.driver.extraClassPath", SCIEZKA_DO_STEROWNIKA_JAR) \
        .getOrCreate()
    
    print("--- SUKCES! Sesja Spark uruchomiona. --- \n")

except Exception as e:
    print(f"BŁĄD KRYTYCZNY: Nie udało się uruchomić Sparka.")
    print("Sprawdź, czy nazwa pliku w 'SCIEZKA_DO_STEROWNIKA_JAR' jest poprawna.")
    print(e)
    raise SystemExit("Zatrzymano z powodu błędu Spark.")

print(f">>> Wczytywanie danych z tabeli '{DB_TABLE}'...")
try:
    df = spark.read \
        .format("jdbc") \
        .option("url", DB_URL) \
        .option("dbtable", DB_TABLE) \
        .option("user", DB_USER) \
        .option("password", DB_PASS) \
        .load()

    print(f"--- SUKCES! Pobrano dane z MySQL. Oto one ({df.count()} osób): ---")
    df.show()

except Exception as e:
    print(f"BŁĄD: Nie udało się odczytać danych z MySQL.")
    print("Sprawdź czy: 1. Serwer MySQL działa, 2. Hasło jest poprawne (DB_PASS), 3. Baza 'grupa_db' i tabela 'koledzy' istnieją.")
    print(e)
    spark.stop()
    raise SystemExit("Zatrzymano z powodu błędu bazy danych.")


# 4. WYFILTRUJ OSOBY, KTÓRYCH WIEK JEST WYŻSZY NIŻ 22 LATA
print("\n>>> Filtrowanie osób z wiekiem > 22:")

df_filtered = df.filter(df.wiek > 22)

print(f"--- SUKCES! Znaleziono {df_filtered.count()} osób starszych niż 22 lata: ---")
df_filtered.show()


# 5. DOPISZ DO BAZY DANYCH INFORMACJE O DODATKOWEJ OSOBIE
print("\n>>> Dodawanie nowej osoby ('Jan Testowy') do bazy MySQL...")

# Tworzymy dane dla nowej osoby
# Ważne: Lista kolumn musi pasować do bazy DOKŁADNIE, ale pomijamy 'id'
new_person_data = [
    ('Jan', 'Testowy', 'jan.test@mail.com', '999888777', 'M', 26, 'Zdalnie', 'PySpark')
]

# Tworzymy schemat dla nowej osoby (bez kolumny 'id')
new_person_schema = StructType([
    StructField("imie", StringType(), True),
    StructField("nazwisko", StringType(), True),
    StructField("email", StringType(), True),
    StructField("telefon", StringType(), True),
    StructField("plec", StringType(), True),
    StructField("wiek", IntegerType(), True),
    StructField("miasto", StringType(), True),
    StructField("ulubiona_technologia", StringType(), True)
])

new_person_df = spark.createDataFrame(data=new_person_data, schema=new_person_schema)

new_person_df.write \
    .format("jdbc") \
    .option("url", DB_URL) \
    .option("dbtable", DB_TABLE) \
    .option("user", DB_USER) \
    .option("password", DB_PASS) \
    .mode("append") \
    .save()

print("--- SUKCES! Nowa osoba dodana. ---")

print(f"\n>>> Weryfikacja. Pełna tabela '{DB_TABLE}' po aktualizacji:")
final_df = spark.read \
    .format("jdbc") \
    .option("url", DB_URL) \
    .option("dbtable", DB_TABLE) \
    .option("user", DB_USER) \
    .option("password", DB_PASS) \
    .load()

final_df.show()
print(f"Całkowita liczba osób w bazie: {final_df.count()}")
spark.stop()
print("\n>>> Sesja Spark zakończona. Zadanie wykonane.")