In [1]:
## Consumer
from kafka import KafkaConsumer
from pyspark.sql import SparkSession

In [2]:
from kafka import KafkaConsumer
from pyspark import SparkContext
from pyspark.sql import SparkSession
from pyspark.sql.types import StructType, StructField, StringType, IntegerType, FloatType
import smtplib
from email.mime.multipart import MIMEMultipart
from email.mime.text import MIMEText
from pyspark.sql.functions import col, to_timestamp
from datetime import datetime

# Initialize SparkContext
sc = SparkContext("local", "MilkPropertiesConsumer")

# Kafka server and topic information
bootstrap_servers = 'w01.itversity.com:9092'
topic = 'MilkPro_Data_Pipeline1'

# Create a Kafka consumer
consumer = KafkaConsumer(
    topic,
    bootstrap_servers=bootstrap_servers,
    group_id='milk_properties_group'
)

# Create an empty list to store the data
data_list = []

# Global counter for data fetch
data_counter = 0

# Consume and process the data
for message in consumer:
    # Get the consumed message value
    message_value = message.value.decode('utf-8')
    #print(message_value)

    # Split the message value by comma (assuming comma-separated values)
    values = message_value.split(',')

    # Process the values and convert them to the desired data types
    milk_id = int(values[0])
    batch_id = str(values[1])
    farm_id = str(values[2])
    area = str(values[3])
    timestamp = str(values[4])
    #print(timestamp)
    fat_percentage = float(values[5])
    protein_percentage = float(values[6])
    lactose_percentage = float(values[7])
    temperature = int(values[8])

    # Append the processed data as a tuple to the list
    data_list.append((milk_id, batch_id, farm_id, area, timestamp, fat_percentage, protein_percentage, lactose_percentage, temperature))
    
    # Increment the data counter
    data_counter += 1
    
    # Break the loop if enough data has been processed (e.g., 1000 rows)
    if data_counter >= 1000:
        break

# Close the Kafka consumer
consumer.close()

# Create a SparkSession
spark = SparkSession.builder \
    .appName("MilkPropertiesConsumer") \
    .getOrCreate()

# Create a schema for the DataFrame
schema = StructType([
    StructField("milk_id", IntegerType(), True),
    StructField("batch_id", StringType(), True),
    StructField("farm_id", StringType(), True),
    StructField("area", StringType(), True),
    StructField("timestamp", StringType(), True),
    StructField("fat_percentage", FloatType(), True),
    StructField("protein_percentage", FloatType(), True),
    StructField("lactose_percentage", FloatType(), True),
    StructField("temperature", IntegerType(), True)
])

# Create a DataFrame from the collected data with the defined schema
df = spark.createDataFrame(data_list, schema=schema)

# Show the DataFrame
#print("Data Frame:")
#df.show(truncate=False)

# Create empty DataFrames to store different areas and bad data
area_dataframes = {}
bad_data_dataframes = {}

# Process the data and separate it based on the area
for row in df.rdd.collect():
    area = row['area']
    ## Check if the temperature is within the desired range (63 to 75)
    if 63 <= row['temperature'] <= 75:
        # Check if the area value is valid
        if area in ['Karad', 'Navarasta', 'Patan', 'Koyana']:
            # Check if a DataFrame for the area already exists in the dictionary
            if area in area_dataframes:
                # Append the row to the existing DataFrame for the area
                area_dataframes[area] = area_dataframes[area].union(spark.createDataFrame([row], schema))
            else:
                # Create a new DataFrame for the area
                area_dataframes[area] = spark.createDataFrame([row], schema)
        else:
            # Check if a DataFrame for bad data in the area already exists in the dictionary
            if 'Bad Data' in area_dataframes:
                # Append the row to the existing DataFrame for bad data
                bad_data_dataframes['Bad Data'] = bad_data_dataframes['Bad Data'].union(spark.createDataFrame([row], schema))
            else:
                # Create a new DataFrame for bad data
                bad_data_dataframes['Bad Data'] = spark.createDataFrame([row], schema)
    else:
        # Temperature is outside the desired range, add to bad data 
        if 'Bad Data' in bad_data_dataframes:
            # Append the row to the existing DataFrame for bad data
            bad_data_dataframes['Bad Data'] = bad_data_dataframes['Bad Data'].union(spark.createDataFrame([row], schema))
        else:
            # Create a new DataFrame for bad data
            bad_data_dataframes['Bad Data'] = spark.createDataFrame([row], schema)

# Show the DataFrames for each area
for area, area_df in area_dataframes.items():
    print(f"Data for Area: {area}")
    area_df.show(truncate=True)

# Show the DataFrames for bad data
for area, bad_data_df in bad_data_dataframes.items():
    print(f"Bad Data for Area: {area}")
    bad_data_df.show(truncate=True)

# Define the HDFS output path
hdfs_output_path = '/user/itv007039/warehouse/ybm_test_db.db/'

# Save DataFrames to HDFS
for area, area_df in area_dataframes.items():
    area_df.write.parquet(hdfs_output_path + f'{area}_data.parquet', mode='overwrite')

# Save the bad data DataFrame to HDFS
#if bad_data_df is not None:
    #bad_data_df.write.parquet(hdfs_output_path + 'bad_data.parquet', mode='overwrite')       

# Save the bad data DataFrame to HDFS
if 'Bad Data' in bad_data_dataframes:
    bad_data_df = bad_data_dataframes['Bad Data']
    bad_data_df.write.parquet(hdfs_output_path + 'bad_data.parquet', mode='overwrite')       

# Stop the SparkSession
spark.stop()

# Email Alert
def mail_details():
    body = '''
    Dear Manager,

    We have detected bad data in the milk processing unit.
    You are requested to take necessary action.

    Regards,
    Milk Processing Unit
    '''

    return body
def send_mail(sender_email, receiver_email, subject, body, app_password):
    import ssl
    msg = MIMEMultipart()
    msg['From'] = sender_email
    msg['To'] = receiver_email
    msg['Subject'] = subject
    body_text = MIMEText(body, 'plain')
    msg.attach(body_text)
    context = ssl.create_default_context()
    try:
        with smtplib.SMTP_SSL("smtp.gmail.com", 465, context=context) as server:
            server.login(sender_email, app_password)
            server.sendmail(sender_email, receiver_email, msg.as_string())
        print("Email sent successfully!")
    except Exception as e:
        print("An error occurred while sending the email:")
        print(e)
        

# Set up email details
sender_email = "yogesh.ybm999@gmail.com"  # Replace with your Gmail email address
receiver_email = "yogesh.yp999@gmail.com"  # Replace with the manager's email address
app_password = "svoiqspsvxbomcld"  # Replace with your actual app password
subject = "Bad Data Alert"

# Send email alert for bad data
if 'Bad Data' in bad_data_dataframes:  # Check if bad_data_df has rows
    email_body = mail_details()
    send_mail(sender_email, receiver_email, subject, email_body, app_password)
    print("Email sent for bad data alert.")
else:
    print("No bad data detected. No email sent.")

Data for Area: Patan
+-------+-------------------+-------+-----+----------+--------------+------------------+------------------+-----------+
|milk_id|           batch_id|farm_id| area| timestamp|fat_percentage|protein_percentage|lactose_percentage|temperature|
+-------+-------------------+-------+-----+----------+--------------+------------------+------------------+-----------+
|    955|M_PAT207_15-06-2023| PAT207|Patan|15-06-2023|          0.19|              3.63|              4.56|         68|
|    153|M_PAT221_23-06-2023| PAT221|Patan|23-06-2023|          6.21|               3.8|              4.59|         70|
|    771|M_PAT219_31-05-2023| PAT219|Patan|31-05-2023|           0.4|              4.07|              5.64|         64|
|    476|M_PAT280_19-07-2023| PAT280|Patan|19-07-2023|          2.33|              3.21|              4.99|         74|
|    356|M_PAT254_06-05-2023| PAT254|Patan|06-05-2023|          2.65|              4.16|              5.64|         75|
|    369|M_PAT223_1