In [None]:
import findspark
findspark.init()
from pyspark.sql import SparkSession
from pyspark.sql.types import StructType, StructField, StringType, IntegerType

JAR_FILE_PATH = "mysql-connector.jar"
DB_PASS = "DevOpsMaster!"
DB_USER = "root"
DB_URL = "jdbc:mysql://localhost:3306/company_db"
DB_TABLE = "developers"

print(">>> Start Spark session...")

try:
    spark = SparkSession.builder \
        .appName("MySQL_Task_New_Version") \
        .config("spark.driver.extraClassPath", JAR_FILE_PATH) \
        .getOrCreate()
    print("--- SUCCESS! Spark session started. ---\n")
except Exception as e:
    print("CRITICAL ERROR: Failed to start Spark.")
    print(e)
    raise SystemExit("Stopped due to Spark error.")

print(f">>> Loading data from '{DB_TABLE}'...")
try:
    df = spark.read \
        .format("jdbc") \
        .option("url", DB_URL) \
        .option("dbtable", DB_TABLE) \
        .option("user", DB_USER) \
        .option("password", DB_PASS) \
        .load()
    print(f"--- SUCCESS! Loaded {df.count()} records. ---")
    df.show()
except Exception as e:
    print("ERROR: Failed to read from MySQL.")
    print(e)
    spark.stop()
    raise SystemExit("Stopped due to DB error.")

print("\n>>> Filtering age > 30:")
df_filtered = df.filter(df.age > 30)
df_filtered.show()

print("\n>>> Adding new person...")
new_person_data = [('Linus', 'Torvalds', 'linus@linux.org', '555444333', 'M', 53, 'Helsinki', 'C/Linux')]

new_person_schema = StructType([
    StructField("first_name", StringType(), True),
    StructField("last_name", StringType(), True),
    StructField("email", StringType(), True),
    StructField("phone", StringType(), True),
    StructField("sex", StringType(), True),
    StructField("age", IntegerType(), True),
    StructField("city", StringType(), True),
    StructField("favorite_tech", StringType(), True)
])

new_person_df = spark.createDataFrame(data=new_person_data, schema=new_person_schema)

new_person_df.write \
    .format("jdbc") \
    .option("url", DB_URL) \
    .option("dbtable", DB_TABLE) \
    .option("user", DB_USER) \
    .option("password", DB_PASS) \
    .mode("append") \
    .save()

print("--- SUCCESS! Person added. ---")

print("\n>>> Verification:")
final_df = spark.read \
    .format("jdbc") \
    .option("url", DB_URL) \
    .option("dbtable", DB_TABLE) \
    .option("user", DB_USER) \
    .option("password", DB_PASS) \
    .load()

final_df.show()
print(f"Total records: {final_df.count()}")
spark.stop()