In [0]:
# Import necessary libraries
from pyspark.sql import SparkSession
from pyspark.sql.types import StructType, StructField, IntegerType, StringType

# Create a SparkSession
spark = SparkSession.builder.appName("CreatePySparkTables").getOrCreate()

# Define the data schema for the Customers table
customers_schema = StructType([
    StructField("customer_id", IntegerType(), True),
    StructField("customer_name", StringType(), True)
])

# Define the data schema for the Orders table
orders_schema = StructType([
    StructField("order_id", IntegerType(), True),
    StructField("customer_id", IntegerType(), True),
    StructField("product_name", StringType(), True)
])

# Create DataFrames from the provided data
customers_data = [(1, "Daniel"), (2, "Diana"), (3, "Elizabeth"), (4, "Jhon")]
orders_data = [(10, 1, "A"), (20, 1, "B"), (30, 1, "D"), (40, 1, "C"),
               (50, 2, "A"), (60, 3, "A"), (70, 3, "B"), (80, 3, "D"), (90, 4, "C")]

customers_df = spark.createDataFrame(customers_data, schema=customers_schema)
orders_df = spark.createDataFrame(orders_data, schema=orders_schema)

# Create temporary tables
customers_df.createOrReplaceTempView("Customers")
orders_df.createOrReplaceTempView("Orders")

# Verify that the tables are created
customers_df.show()
orders_df.show()


+-----------+-------------+
|customer_id|customer_name|
+-----------+-------------+
|          1|       Daniel|
|          2|        Diana|
|          3|    Elizabeth|
|          4|         Jhon|
+-----------+-------------+

+--------+-----------+------------+
|order_id|customer_id|product_name|
+--------+-----------+------------+
|      10|          1|           A|
|      20|          1|           B|
|      30|          1|           D|
|      40|          1|           C|
|      50|          2|           A|
|      60|          3|           A|
|      70|          3|           B|
|      80|          3|           D|
|      90|          4|           C|
+--------+-----------+------------+




You can't use sum directly on PySpark columns within a group aggregation. Instead, you should use conditional aggregation with when to calculate counts. 

In [0]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, when
import logging
from datetime import datetime
import traceback
import IPython

# Function to log errors and additional information
def log_error_and_info(exception):
    try:
        # Get the notebook name
        notebook_name = IPython.notebook.notebook_name

        # Get the current time
        current_time = datetime.now()

        # Format the current time as a string
        current_time_str = current_time.strftime("%Y-%m-%d %H:%M:%S")

        # Log the error message and additional information
        logging.error(f"Error in notebook: {notebook_name}")
        logging.error(f"Time of error: {current_time_str}")
        logging.error(f"Error message: {str(exception)}")

        # Optionally, log the traceback for more detailed error information
        logging.error("Traceback:\n" + traceback.format_exc())

    except Exception as e:
        # If there is an error while logging, print the error message
        print(f"An error occurred while logging: {str(e)}")

def recommendation_query():
    try:
        # Create a SparkSession
        spark = SparkSession.builder.appName("RecommendationQuery").getOrCreate()

        # Assuming you have already created DataFrames customers_df and orders_df
        # representing the Customers and Orders tables, respectively.

        # Calculate the counts of purchases for each product
        product_counts_df = orders_df.groupBy("customer_id") \
            .agg(
                sum(when(col("product_name") == "A", 1).otherwise(0)).alias("A_count"),
                sum(when(col("product_name") == "B", 1).otherwise(0)).alias("B_count"),
                sum(when(col("product_name") == "C", 1).otherwise(0)).alias("C_count")
            )

        # Filter customers who bought products A and B but not C
        result_df = product_counts_df.filter(
            (col("A_count") > 0) & (col("B_count") > 0) & (col("C_count") == 0)
        ) \
        .select("customer_id") \
        .join(customers_df, "customer_id", "inner") \
        .orderBy("customer_id")

        # Show the result
        result_df.show()
    
    except Exception as e:
        # Call the log_error_and_info function to log error details
        log_error_and_info(e)

# Call the function to execute the query
recommendation_query()


+-----------+-------------+
|customer_id|customer_name|
+-----------+-------------+
|          3|    Elizabeth|
+-----------+-------------+



In [0]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, split

# Create a SparkSession
spark = SparkSession.builder.appName("PatientsTable").getOrCreate()

# Sample data for the Patients table
data = [(1, "Daniel", "YFEV COUGH"),
        (2, "Alice", ""),
        (3, "Bob", "DIAB100 MYOP"),
        (4, "George", "ACNE DIAB100"),
        (5, "Alain", "DIAB201")]

# Create a DataFrame for the Patients table
patients_df = spark.createDataFrame(data, ["patient_id", "patient_name", "conditions"])

# Show the contents of the Patients table
print("Patients Table:")
patients_df.show()

Patients Table:
+----------+------------+------------+
|patient_id|patient_name|  conditions|
+----------+------------+------------+
|         1|      Daniel|  YFEV COUGH|
|         2|       Alice|            |
|         3|         Bob|DIAB100 MYOP|
|         4|      George|ACNE DIAB100|
|         5|       Alain|     DIAB201|
+----------+------------+------------+



In [0]:
# Filter patients with Type I Diabetes (DIAB1 prefix)
type1_diabetes_patients_df = patients_df.filter(col("conditions").like("DIAB1%") | col("conditions").like("% DIAB1%"))

# Select the required columns
result_df = type1_diabetes_patients_df.select("patient_id", "patient_name", "conditions")

# Show the patients with Type I Diabetes
print("Patients with Type I Diabetes (DIAB1 prefix):")
result_df.show()

Patients with Type I Diabetes (DIAB1 prefix):
+----------+------------+------------+
|patient_id|patient_name|  conditions|
+----------+------------+------------+
|         3|         Bob|DIAB100 MYOP|
|         4|      George|ACNE DIAB100|
+----------+------------+------------+

