In [0]:
spark

In [0]:
from pyspark.sql import SparkSession

spark = SparkSession.builder.appName("Environmental Reporting").getOrCreate()

In [0]:
!pip install confluent-kafka


Collecting confluent-kafka
  Downloading confluent_kafka-2.4.0-cp39-cp39-manylinux_2_17_x86_64.manylinux2014_x86_64.whl (4.0 MB)
[?25l[K     |                                | 10 kB 15.4 MB/s eta 0:00:01[K     |▏                               | 20 kB 4.8 MB/s eta 0:00:01[K     |▎                               | 30 kB 7.0 MB/s eta 0:00:01[K     |▎                               | 40 kB 4.2 MB/s eta 0:00:01[K     |▍                               | 51 kB 4.4 MB/s eta 0:00:01[K     |▌                               | 61 kB 5.2 MB/s eta 0:00:01[K     |▋                               | 71 kB 5.4 MB/s eta 0:00:01[K     |▋                               | 81 kB 6.1 MB/s eta 0:00:01[K     |▊                               | 92 kB 5.1 MB/s eta 0:00:01[K     |▉                               | 102 kB 5.2 MB/s eta 0:00:01[K     |█                               | 112 kB 5.2 MB/s eta 0:00:01[K     |█                               | 122 kB 5.2 MB/s eta 0:00:01[K     |█        

In [0]:
import pyspark.sql.functions as F
from  pyspark.sql.functions import col, struct, to_json
from pyspark.sql.types import StructField, StructType, StringType, MapType

# COMMAND ----------

kafkaDF = spark \
    .readStream \
    .format("kafka") \
    .option("kafka.bootstrap.servers", "abc2.us-west4.gcp.confluent.cloud:9092") \
      .option("subscribe", "illegal_dumping") \
      .option("startingOffsets", "earliest") \
      .option("kafka.security.protocol","SASL_SSL") \
      .option("kafka.sasl.mechanism", "PLAIN") \
      .option("kafka.sasl.jaas.config", """kafkashaded.org.apache.kafka.common.security.plain.PlainLoginModule required username="yourusername" password="yourpassword";""") \
    .load()


In [0]:
processedDF = kafkaDF.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")

In [0]:
display(processedDF)

key,value
key,"{'incident_type': 'Illegal Dumping', 'location': 'Chicago', 'description': 'Illegal dumping of electronic waste in a residential neighborhood, exposing residents to harmful chemicals and heavy metals.', 'contact': ''}"
key,"{'incident_type': 'Illegal Dumping', 'location': 'Boston', 'description': 'Dumping of medical waste, including syringes and pharmaceutical containers, near a riverbank, posing health risks to the community and wildlife', 'contact': ''}"
key,"{'incident_type': 'Illegal Dumping', 'location': 'Houston, Texas', 'description': 'Dumping of medical waste, including syringes and pharmaceutical containers, near a riverbank, posing health risks to the community and wildlife', 'contact': ''}"
key,"{'incident_type': 'Illegal Dumping', 'location': 'Chicago, Illinois', 'description': 'Illegal dumping of electronic waste in a residential neighborhood, exposing residents to harmful chemicals and heavy metals.', 'contact': ''}"
key,"{'incident_type': 'Illegal Dumping', 'location': 'Arlington, Texas', 'description': 'This is an absolute DISASTER! Raw sewage is overflowing into the streets, causing a public health crisis. This is UNACCEPTABLE!', 'contact': ''}"
key,"{'incident_type': 'Illegal Dumping', 'location': 'Colorado', 'description': 'Oversized furniture blocking the sidewalk, making it difficult for pedestrians, especially those with disabilities, to navigate.', 'contact': ''}"
key,"{'incident_type': 'Illegal Dumping', 'location': 'New York', 'description': 'A refrigerator overflowing with rotting food dumped behind a grocery store, attracting pests and spreading a putrid odor.', 'contact': ''}"
key,"{'incident_type': 'Illegal Dumping', 'location': 'New York', 'description': 'Very smell restaurant scraps outside a McDonald's', 'contact': ''}"
key,"{'incident_type': 'Illegal Dumping', 'location': 'New York', 'description': 'Illegal dumping', 'contact': ''}"
key,"{'incident_type': 'Illegal Dumping', 'location': 'New York', 'description': 'Illegal dumping 2', 'contact': ''}"


In [0]:
import pyspark.sql.functions as F
from  pyspark.sql.functions import col, struct, to_json
from pyspark.sql.types import StructField, StructType, StringType, MapType

json_schema = StructType(
  [
    StructField("incident_type", StringType(), nullable = False),
    StructField("location", StringType(), nullable = False),
    StructField("description", StringType(), nullable = True),
    StructField("contact", StringType(), nullable = True) 
  ]
  )
# COMMAND ----------
query=processedDF.withColumn('value', F.from_json(F.col('value').cast('string'), json_schema))  \
      .select(F.col("value.incident_type"),F.col("value.location"),F.col("value.description")) 
display(query)

incident_type,location,description
Illegal Dumping,Chicago,"Illegal dumping of electronic waste in a residential neighborhood, exposing residents to harmful chemicals and heavy metals."
Illegal Dumping,Boston,"Dumping of medical waste, including syringes and pharmaceutical containers, near a riverbank, posing health risks to the community and wildlife"
Illegal Dumping,"Houston, Texas","Dumping of medical waste, including syringes and pharmaceutical containers, near a riverbank, posing health risks to the community and wildlife"
Illegal Dumping,"Chicago, Illinois","Illegal dumping of electronic waste in a residential neighborhood, exposing residents to harmful chemicals and heavy metals."
Illegal Dumping,"Arlington, Texas","This is an absolute DISASTER! Raw sewage is overflowing into the streets, causing a public health crisis. This is UNACCEPTABLE!"
Illegal Dumping,Colorado,"Oversized furniture blocking the sidewalk, making it difficult for pedestrians, especially those with disabilities, to navigate."
Illegal Dumping,New York,"A refrigerator overflowing with rotting food dumped behind a grocery store, attracting pests and spreading a putrid odor."
,,
Illegal Dumping,New York,Illegal dumping
Illegal Dumping,New York,Illegal dumping 2


In [0]:
!pip install geopy


Collecting geopy
  Downloading geopy-2.4.1-py3-none-any.whl (125 kB)
[?25l[K     |██▋                             | 10 kB 8.0 MB/s eta 0:00:01[K     |█████▎                          | 20 kB 3.7 MB/s eta 0:00:01[K     |███████▉                        | 30 kB 5.3 MB/s eta 0:00:01[K     |██████████▌                     | 40 kB 4.5 MB/s eta 0:00:01[K     |█████████████                   | 51 kB 4.2 MB/s eta 0:00:01[K     |███████████████▊                | 61 kB 5.0 MB/s eta 0:00:01[K     |██████████████████▎             | 71 kB 5.2 MB/s eta 0:00:01[K     |█████████████████████           | 81 kB 5.9 MB/s eta 0:00:01[K     |███████████████████████▌        | 92 kB 5.9 MB/s eta 0:00:01[K     |██████████████████████████▏     | 102 kB 5.0 MB/s eta 0:00:01[K     |████████████████████████████▊   | 112 kB 5.0 MB/s eta 0:00:01[K     |███████████████████████████████▍| 122 kB 5.0 MB/s eta 0:00:01[K     |████████████████████████████████| 125 kB 5.0 MB/s 
[?25hCollecting g

In [0]:
from pyspark.sql.functions import udf
from pyspark.sql.types import StringType

# Define the regions_to_states dictionary
regions_to_states = {
    'South': ['West Virginia', 'District of Columbia', 'Maryland', 'Virginia',
              'Kentucky', 'Tennessee', 'North Carolina', 'Mississippi',
              'Arkansas', 'Louisiana', 'Alabama', 'Georgia', 'South Carolina',
              'Florida', 'Delaware'],
    'Southwest': ['Arizona', 'New Mexico', 'Oklahoma', 'Texas'],
    'West': ['Washington', 'Oregon', 'California', 'Nevada', 'Idaho', 'Montana',
             'Wyoming', 'Utah', 'Colorado', 'Alaska', 'Hawaii'],
    'Midwest': ['North Dakota', 'South Dakota', 'Nebraska', 'Kansas', 'Minnesota',
                'Iowa', 'Missouri', 'Wisconsin', 'Illinois', 'Michigan', 'Indiana',
                'Ohio'],
    'Northeast': ['Maine', 'Vermont', 'New York', 'New Hampshire', 'Massachusetts',
                  'Rhode Island', 'Connecticut', 'New Jersey', 'Pennsylvania']
}

#from geotext import GeoText
from geopy.geocoders import Nominatim

# Initialize a Nominatim geocoder
#geolocator = Nominatim(user_agent="my_application")

# Define a UDF to extract state names from location text

def extract_state(location_text):
    geolocator = Nominatim(user_agent="my_application")
    location = geolocator.geocode(location_text)
    #print(location)
    #print(type(location.raw))
    if location:
        state = location.raw['display_name'].split(',')[-2]
        return state
    else:
        return "Unknown"
    
# Create a UDF to map states to regions
@udf(StringType())
def map_state_to_region(location):
    state = extract_state(location).strip()
    for region, states in regions_to_states.items():
        if state in states:
            return region
    return "Unknown"  # Return "Unknown" for states not found in the dictionary

# Apply the UDF to map states to regions
df_with_region = query.withColumn("region", map_state_to_region(query["location"]))

display(df_with_region)


incident_type,location,description,region
Illegal Dumping,Chicago,"Illegal dumping of electronic waste in a residential neighborhood, exposing residents to harmful chemicals and heavy metals.",Midwest
Illegal Dumping,Boston,"Dumping of medical waste, including syringes and pharmaceutical containers, near a riverbank, posing health risks to the community and wildlife",Northeast
Illegal Dumping,"Houston, Texas","Dumping of medical waste, including syringes and pharmaceutical containers, near a riverbank, posing health risks to the community and wildlife",Southwest
Illegal Dumping,"Chicago, Illinois","Illegal dumping of electronic waste in a residential neighborhood, exposing residents to harmful chemicals and heavy metals.",Midwest
Illegal Dumping,"Arlington, Texas","This is an absolute DISASTER! Raw sewage is overflowing into the streets, causing a public health crisis. This is UNACCEPTABLE!",Southwest
Illegal Dumping,Colorado,"Oversized furniture blocking the sidewalk, making it difficult for pedestrians, especially those with disabilities, to navigate.",West
Illegal Dumping,New York,"A refrigerator overflowing with rotting food dumped behind a grocery store, attracting pests and spreading a putrid odor.",Northeast
,,,Unknown
Illegal Dumping,New York,Illegal dumping,Northeast
Illegal Dumping,New York,Illegal dumping 2,Northeast


In [0]:
!pip install vaderSentiment

Collecting vaderSentiment
  Downloading vaderSentiment-3.3.2-py2.py3-none-any.whl (125 kB)
[?25l[K     |██▋                             | 10 kB 16.3 MB/s eta 0:00:01[K     |█████▏                          | 20 kB 11.9 MB/s eta 0:00:01[K     |███████▉                        | 30 kB 6.9 MB/s eta 0:00:01[K     |██████████▍                     | 40 kB 4.9 MB/s eta 0:00:01[K     |█████████████                   | 51 kB 5.3 MB/s eta 0:00:01[K     |███████████████▋                | 61 kB 5.7 MB/s eta 0:00:01[K     |██████████████████▏             | 71 kB 5.6 MB/s eta 0:00:01[K     |████████████████████▉           | 81 kB 5.8 MB/s eta 0:00:01[K     |███████████████████████▍        | 92 kB 6.5 MB/s eta 0:00:01[K     |██████████████████████████      | 102 kB 5.7 MB/s eta 0:00:01[K     |████████████████████████████▋   | 112 kB 5.7 MB/s eta 0:00:01[K     |███████████████████████████████▏| 122 kB 5.7 MB/s eta 0:00:01[K     |████████████████████████████████| 125 kB 5.7 M

In [0]:
from pyspark.sql.functions import udf
from pyspark.sql.types import StringType
from vaderSentiment.vaderSentiment import SentimentIntensityAnalyzer

# Initialize the VADER sentiment analyzer
analyzer = SentimentIntensityAnalyzer()

# Define a function to perform sentiment analysis on the description text
def analyze_sentiment(description):
    # Get the compound sentiment score from VADER
    if not description:
        description = "hi"
    sentiment_score = analyzer.polarity_scores(description)
    print(sentiment_score)
    sentiment_score_neg = sentiment_score['neg']
    
    # Categorize the severity based on the sentiment score
    if sentiment_score_neg >= 0.4:
        return "High"
    elif sentiment_score_neg >= 0.2 and sentiment_score_neg < 0.4:
        return "Medium"
    else:
        return "Low"

# Create a UDF for sentiment analysis
sentiment_udf = udf(analyze_sentiment, StringType())

# Apply the UDF to the description column of the processed DataFrame (processedDF)
# Replace "processedDF" and "description_column" with your actual DataFrame and column name
processedDF_with_severity = query.withColumn("severity", sentiment_udf("description"))

# Show the DataFrame with the added severity column
display(processedDF_with_severity)

incident_type,location,description,severity
Illegal Dumping,Chicago,"Illegal dumping of electronic waste in a residential neighborhood, exposing residents to harmful chemicals and heavy metals.",High
Illegal Dumping,Boston,"Dumping of medical waste, including syringes and pharmaceutical containers, near a riverbank, posing health risks to the community and wildlife",Medium
Illegal Dumping,"Houston, Texas","Dumping of medical waste, including syringes and pharmaceutical containers, near a riverbank, posing health risks to the community and wildlife",Medium
Illegal Dumping,"Chicago, Illinois","Illegal dumping of electronic waste in a residential neighborhood, exposing residents to harmful chemicals and heavy metals.",High
Illegal Dumping,"Arlington, Texas","This is an absolute DISASTER! Raw sewage is overflowing into the streets, causing a public health crisis. This is UNACCEPTABLE!",High
Illegal Dumping,Colorado,"Oversized furniture blocking the sidewalk, making it difficult for pedestrians, especially those with disabilities, to navigate.",Medium
Illegal Dumping,New York,"A refrigerator overflowing with rotting food dumped behind a grocery store, attracting pests and spreading a putrid odor.",Low
,,,Low
Illegal Dumping,New York,Illegal dumping,High
Illegal Dumping,New York,Illegal dumping 2,High


In [0]:
# Register the streaming DataFrame as a temporary view
processedDF_with_severity.createOrReplaceTempView("incident_reports")

# Define SQL queries for aggregations
total_incidents_query = """
    SELECT 
        incident_type,
        COUNT(*) AS total_incidents
    FROM 
        incident_reports
    GROUP BY 
        incident_type
"""

severity_incidents_query = """
    SELECT 
        incident_type,
        severity,
        COUNT(*) AS severity_incidents
    FROM 
        incident_reports
    GROUP BY 
        incident_type, severity
"""

# Perform aggregations
total_incidents_df = spark.sql(total_incidents_query)
severity_incidents_df = spark.sql(severity_incidents_query)

display(severity_incidents_df)

# Join the aggregations
#joined_df = total_incidents_df.join(severity_incidents_df, on='incident_type', how='left')

# Calculate severity percentage
#severity_percentage_df = joined_df.withColumn("severity_percentage", 
                                              #(joined_df["severity_incidents"] / joined_df["total_incidents"]) * 100)

#display(severity_percentage_df)

# Start the streaming query to output the results
#query = severity_percentage_df.writeStream \
 #   .outputMode("append") \
   # .format("console") \
    #.start()

# Await termination
#query.awaitTermination()

incident_type,severity,severity_incidents
Illegal Dumping,Low,1
Illegal Dumping,High,3
Illegal Dumping,Medium,3


In [0]:
# Count the number of incidents in each severity level
severity_counts = processedDF_with_severity.groupBy("severity").count()
display(severity_counts)

severity,count
High,3
Low,1
Medium,3


In [0]:
# Find the most common locations for illegal dumping of each severity level
severity_locations = processedDF_with_severity.groupBy("severity", "location").count().orderBy("severity", "count", ascending=False)
display(severity_locations)

severity,location,count
Medium,Colorado,1
Medium,Boston,1
Medium,"Houston, Texas",1
Low,New York,1
High,"Chicago, Illinois",1
High,"Arlington, Texas",1
High,Chicago,1


In [0]:
# Define the path where you'll store the streaming data in Delta Lake
delta_table_path = "`result_delta_table`"

# Set a checkpoint location for the streaming query
checkpoint_location = "/FileStore/tables/checkpoints"

# Define a trigger to process micro-batches of data every 10 seconds
trigger = "processing=30 seconds"  # 10 seconds in milliseconds

# Start the streaming query with checkpointing and trigger
streaming_query = processedDF_with_severity.writeStream\
  .outputMode("append")\
  .option("checkpointLocation", checkpoint_location)\
  .trigger(processingTime='10 seconds')\
  .format("delta")\
  .toTable(delta_table_path)


In [0]:
deltaDF = spark.readStream.table("result_delta_table")
display(deltaDF)

incident_type,location,description,severity
Deforestation,Iowa,A lot of trees fell off for logging,Low
Deforestation,Iowa,xyzbvjbndkf,Low


In [0]:
%sql
SELECT * FROM result_delta_table;

incident_type,location,description,severity
Deforestation,Iowa,A lot of trees fell off for logging,Low
Deforestation,Iowa,xyzbvjbndkf,Low
