# Import necessary libraries

In [8]:
import jaydebeapi
import pandas as pd
import os
from config import ACCESS_TOKEN, UID
import warnings
warnings.filterwarnings("ignore")

# Get Connection Details and Establish Connection

In [9]:

# Add JVM option to fix Apache Arrow memory access
os.environ['JAVA_TOOL_OPTIONS'] = '--add-opens=java.base/java.nio=ALL-UNNAMED'

# Connection parameters
JDBC_URL = "jdbc:databricks://adb-3055430050503041.1.azuredatabricks.net:443/default;transportMode=http;ssl=1;AuthMech=3;httpPath=/sql/1.0/warehouses/6630450a2133788d;"
JDBC_DRIVER_PATH = "/Users/kiranbele/Documents/Projects/data-engineering-exploring-london-travel-network/Databricks-connection/DatabricksJDBC42-2.6.40.1071/DatabricksJDBC42.jar"

try:
    # Connect to Databricks
    conn = jaydebeapi.connect(
        "com.databricks.client.jdbc.Driver",
        JDBC_URL,
        {"PWD": ACCESS_TOKEN, "UID": UID},
        JDBC_DRIVER_PATH
    )

    # Execute query
    cursor = conn.cursor()
    query = "SELECT * FROM hive_metastore.default.tfl_journeys_final LIMIT 10"
    cursor.execute(query)

    # Fetch results and load into DataFrame
    results = cursor.fetchall()
    columns = [desc[0] for desc in cursor.description]  # Get column names
    df = pd.DataFrame(results, columns=columns)

    # Print results
    print("Query Result:")
    print(df)

    # Close resources
    cursor.close()
    conn.close()
    print("Connection closed.")
except Exception as e:
    print(f"Error: {e}")


Query Result:
   month  year  days report_date       journey_type  journeys_millions
0      1  2010    31  2010-06-30  Underground & DLR          96.836391
1      2  2010    28  2010-06-30  Underground & DLR          90.330504
2      3  2010    31  2010-06-30  Underground & DLR          90.038014
3      4  2010    30  2010-09-30  Underground & DLR          92.544093
4      5  2010    31  2010-09-30  Underground & DLR          88.662911
5      6  2010    30  2010-09-30  Underground & DLR          86.375702
6      7  2010    31  2010-12-31  Underground & DLR          94.986035
7      8  2010    31  2010-12-31  Underground & DLR          97.024794
8      9  2010    30  2010-12-31  Underground & DLR          97.029640
9     10  2010    31  2011-03-31  Underground & DLR          77.369979
Connection closed.


# Queries to answer some business questions

In [10]:

# Query 1: Most popular transport types
most_popular_types_query = """
SELECT journey_type,
    SUM(journeys_millions)/1000000 as total_journeys_millions
FROM hive_metastore.default.tfl_journeys_final
GROUP BY journey_type
ORDER BY total_journeys_millions DESC;
"""

# Query 2: Emirates Airline popularity by month and year
emirates_query = """
SELECT month, 
	year, 
	ROUND(journeys_millions,2) AS rounded_journeys_millions
FROM hive_metastore.default.tfl_journeys_final
WHERE journey_type = 'Emirates Airline' AND journeys_millions IS NOT NULL
ORDER BY rounded_journeys_millions DESC
LIMIT 5;
"""

# Query 3: Least popular years for Underground & DLR
underground_query = """
SELECT year,
	journey_type,
	SUM(journeys_millions) as total_journeys_millions
FROM hive_metastore.default.tfl_journeys_final
WHERE journey_type LIKE '%Underground%'
GROUP BY year, journey_type
ORDER BY total_journeys_millions
LIMIT 5;
"""

try:
    # Connect to Databricks
    conn = jaydebeapi.connect(
        "com.databricks.client.jdbc.Driver",
        JDBC_URL,
        {"PWD": ACCESS_TOKEN, "UID": UID},
        JDBC_DRIVER_PATH
    )

    # Create a cursor object
    cursor = conn.cursor()

    # Execute and fetch results for Query 1
    print("Executing Query 1: Most popular transport types")
    cursor.execute(most_popular_types_query)
    results_1 = cursor.fetchall()
    columns_1 = [desc[0] for desc in cursor.description]
    df1 = pd.DataFrame(results_1, columns=columns_1)
    print("Query 1 Results:")
    print(df1)

    # Execute and fetch results for Query 2
    print("\nExecuting Query 2: Emirates Airline popularity by month and year")
    cursor.execute(emirates_query)
    results_2 = cursor.fetchall()
    columns_2 = [desc[0] for desc in cursor.description]
    df2 = pd.DataFrame(results_2, columns=columns_2)
    print("Query 2 Results:")
    print(df2)

    # Execute and fetch results for Query 3
    print("\nExecuting Query 3: Least popular years for Underground & DLR")
    cursor.execute(underground_query)
    results_3 = cursor.fetchall()
    columns_3 = [desc[0] for desc in cursor.description]
    df3 = pd.DataFrame(results_3, columns=columns_3)
    print("Query 3 Results:")
    print(df3)

    # Close cursor and connection
    cursor.close()
    conn.close()
    print("\nConnection closed.")

except Exception as e:
    print(f"Error: {e}")


Executing Query 1: Most popular transport types
Query 1 Results:
        journey_type  total_journeys_millions
0                Bus                 0.024905
1  Underground & DLR                 0.015020
2         Overground                 0.001667
3           TfL Rail                 0.000411
4               Tram                 0.000315
5   Emirates Airline                 0.000015

Executing Query 2: Emirates Airline popularity by month and year
Query 2 Results:
   month  year  rounded_journeys_millions
0      5  2012                       0.53
1      6  2012                       0.38
2      4  2012                       0.24
3      5  2013                       0.19
4      5  2015                       0.19

Executing Query 3: Least popular years for Underground & DLR
Query 3 Results:
   year       journey_type  total_journeys_millions
0  2020  Underground & DLR               310.179316
1  2021  Underground & DLR               748.452544
2  2022  Underground & DLR              106

## Had trouble using databricks-sql-connector package 
### since pyarrow and apache-arrow could not work together for some reason. 
