<a href="https://colab.research.google.com/github/rmitchell745/notification_app/blob/main/weather_app.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

# Load_Config.py

In [6]:
%%writefile Load_Config.py
import os
import yaml
import os
from Logger import logger # Import the logger

#this module when called loads a dictionary from a yaml configuration file using a relational path and generates singleton classes for use in the other .py modules

def load_config():
  cwd = os.getcwd()

  config_path = os.path.join(cwd, "..", "config",'weather_config.yml')

  try:
      with open(config_path, 'r') as file:
          config = yaml.safe_load(file)
          logger.info(f"Config file loaded successfully from {config_path}")
  except FileNotFoundError:
      logger.error(f"Error: Config file not found at {config_path}")
      return None # Return None if config loading fails


  return config # Return the config dictionary


#weather_config.yml consists of a dictionary for email settings for use with smtplib and a users dictionary containing a list with a dictionary for each user





class User:
    def __init__(self, name, ph_nmbr, carrier, zipcodes):
        self.name = name
        self.ph_nmbr = ph_nmbr
        self.carrier = carrier
        self.zipcodes = zipcodes
        logger.info(f"User object created for {self.name}")


# now to finish configuration and initialization we need to return a list of User Instances and a list of unique zipcodes

def Load_users(config):
  user_list = []
  # Check if 'users' key exists in config and is a list
  if config and 'users' in config and isinstance(config['users'], list):
      logger.info("Loading users from config.")
      for user in config['users']:
          # Add checks for expected keys in user dictionary
          if isinstance(user, dict) and 'name' in user and 'ph_nmbr' in user and 'carrier' in user and 'zipcodes' in user and isinstance(user['zipcodes'], list):
              name = user['name']
              ph_nmbr = user['ph_nmbr']
              carrier = user['carrier']
              zipcodes = user['zipcodes']
              user_instance = User(name, ph_nmbr, carrier, zipcodes)
              user_list.append(user_instance)
              logger.info(f"Loaded user: {name}")
          else:
              logger.warning(f"Skipping invalid user entry in config: {user}")
  elif config:
      logger.warning("'users' key not found or not a list in config file.")

  logger.info(f"Finished loading {len(user_list)} users.")
  return user_list

def Load_zipcodes(user_list):
  zipcodes = []
  for user in user_list :
      for zipcode in user.zipcodes:
          zipcodes.append(zipcode)

  uniq_zipcodes = list(set(zipcodes))
  logger.info(f"Found {len(uniq_zipcodes)} unique zipcodes.")

  return uniq_zipcodes

Writing Load_Config.py


# Send_Email_SMS.py



In [5]:
%%writefile Send_Email_SMS.py
import smtplib
from email.mime.text import MIMEText
from Logger import logger

# define an email config class
class EmailConfig:
    __instance = None

    def __new__(cls):
        if cls.__instance is None:
            cls.__instance = super(EmailConfig, cls).__new__(cls)
            logger.info("EmailConfig singleton instance created.")
        return cls.__instance

    def __init__(self, config):
        if not hasattr(self, '_initialized'):

            if config and 'smtp_server' in config and 'smtp_port' in config and 'sender_email' in config and 'sender_password' in config:
                self.smtp_server = config['smtp_server']
                self.smtp_port = config['smtp_port']
                self.sender_email = config['sender_email']
                self.sender_password = config['sender_password']
                self._initialized = True
                logger.info("EmailConfig instance initialized successfully.")
            else:
                logger.error("Missing required email configuration keys in config.")

                self._initialized = False


    @classmethod
    def get_instance(cls, config=None):
        if cls.__instance is None:
            if config is None:
                logger.error("Config must be provided on first initialization of EmailConfig.")
                raise ValueError("Config must be provided on first initialization")
            cls.__instance = cls(config)
        return cls.__instance




# send the email function
def send_email(ssl_context, sender_email, ph_nmbr, carrier, subject, body, carrier_sms_protocol):
    carrier_domain = carrier_sms_protocol.get(carrier)
    if not carrier_domain:
        logger.warning(f"Unknown carrier '{carrier}'. Cannot send SMS to {ph_nmbr}.")
        return

    receiver_email = f"{ph_nmbr}@{carrier_domain}" # Added @
    message = f"Subject: {subject}\n\n{body}" # Use f-string for subject and body

    try:
        ssl_context.sendmail(sender_email, receiver_email, message)
        logger.info(f"Email sent successfully to {receiver_email}")
    except Exception as e:
        logger.error(f"Failed to send email to {receiver_email}: {e}")

Writing Send_Email_SMS.py


# Get_Weather_Data.py

In [4]:
%%writefile Get_Weather_Data.py
import requests
import os
from Logger import logger

#use env variable for api keys, access openweather api for unique zipcodes and create a dictionary of 3 weather forecasts for each zipcode

api_key = os.environ.get('WEATHER_API_KEY')
if not api_key:
    logger.error("WEATHER_API_KEY environment variable not set.")
    exit() # Exit if no API key

def get_weather(uniq_zipcodes):
    weather_data = {}
    logger.info(f"Fetching weather data for {len(uniq_zipcodes)} unique zipcodes.")
    for zipcode in uniq_zipcodes:
        url = f"http://api.openweathermap.org/data/2.5/forecast?zip={zipcode},cnt=3,us&appid={api_key}"
        try:
            response = requests.get(url)
            response.raise_for_status()
            data = response.json()
            weather_data[zipcode] = data
            logger.info(f"Successfully fetched weather data for zipcode {zipcode}")
        except requests.exceptions.RequestException as e:
            logger.error(f"Error fetching weather data for zipcode {zipcode}: {e}")
            weather_data[zipcode] = {"error": f"Could not fetch weather data: {e}"}

    return weather_data


def human_readable_weather(weather_data):
    human_readable_data = {}
    logger.info("Converting weather data to human readable format.")
    for zipcode, data in weather_data.items():
        human_readable_data[zipcode] = {}
        if "error" in data:
            human_readable_data[zipcode] = {"Error": [data["error"]]}
            logger.warning(f"Skipping human readable conversion for zipcode {zipcode} due to fetch error.")
            continue


        if 'list' in data and data['list']:
            for forecast in data['list']:

                if 'dt_txt' in forecast and 'weather' in forecast and forecast['weather']:
                    date = forecast['dt_txt'].split()[0]
                    time = forecast['dt_txt'].split()[1]

                    if date not in human_readable_data[zipcode]:
                        human_readable_data[zipcode][date] = []


                    if 'description' in forecast['weather'][0]:
                        human_readable_data[zipcode][date].append(f"{time}: {forecast['weather'][0]['description']}")
                    else:
                        human_readable_data[zipcode][date].append(f"{time}: No description available")
                        logger.warning(f"No weather description available for {zipcode} at {date} {time}")
                else:
                     if zipcode not in human_readable_data:
                         human_readable_data[zipcode] = {}
                     if 'No forecast data available' not in human_readable_data[zipcode]:
                         human_readable_data[zipcode]['No forecast data available'] = ["No forecast data available for this time."]
                         logger.warning(f"Missing dt_txt or weather key for forecast in zipcode {zipcode}")


        else:

            human_readable_data[zipcode] = {"No forecast data available": ["No forecast data available for this zipcode."]}
            logger.warning(f"'list' key missing or empty in weather data for zipcode {zipcode}.")

    logger.info("Finished converting weather data to human readable format.")
    return human_readable_data


def generate_message(user, all_human_readable_data):
    message = f"Hello {user.name},\n\nHere is your weather forecast for the next 3 days:\n\n"
    logger.info(f"Generating message for user: {user.name}")
    # Iterate through the user's zipcodes
    for zipcode in user.zipcodes:

        if zipcode in all_human_readable_data:
            message += f"Weather for Zipcode {zipcode}:\n"
            user_weather_data = all_human_readable_data[zipcode]
            for date, forecasts in user_weather_data.items():
                message += f"Date: {date}\n"
                for forecast in forecasts:
                    message += f"{forecast}\n"
                message += "\n"
            logger.info(f"Included weather data for zipcode {zipcode} in message for {user.name}")
        else:
            message += f"No weather data available for Zipcode {zipcode}\n\n"
            logger.warning(f"No weather data available in human_readable_data for user {user.name}'s zipcode {zipcode}")

    logger.info(f"Message generated for user: {user.name}")
    return message

Writing Get_Weather_Data.py


#__Main__.py

In [3]:
%%writefile __main__.py
import Load_Config
import Send_Email_SMS
import Get_Weather_Data
import smtplib
import yaml
import os
from Logger import logger

#main python module that is scheduled to run.

logger.info("Application started.")

#first load config

config = Load_Config.load_config()

# Check if config load was successful
if config is None:
    logger.error("Configuration loading failed. Exiting.")
    exit() # Exit if config is not loaded
else:
    logger.info("Configuration loaded successfully.")


# Call the functions from Load_Config to get the user data and unique zipcodes
user_list = Load_Config.Load_users(config)
uniq_zipcodes = Load_Config.Load_zipcodes(user_list)

# Check if user_list or uniq_zipcodes are empty and handle accordingly
if not user_list:
    logger.error("No users loaded. Exiting.")
    exit()
else:
    logger.info(f"Loaded {len(user_list)} users.")

if not uniq_zipcodes:
    logger.error("No unique zipcodes found. Exiting.")
    exit()
else:
    logger.info(f"Found {len(uniq_zipcodes)} unique zipcodes.")


# Initialize EmailConfig
try:
    email_config = Send_Email_SMS.EmailConfig.get_instance(config.get('email_settings')) # Pass only email settings from config
    if not hasattr(email_config, '_initialized') or not email_config._initialized:
        logger.error("Email configuration initialization failed. Cannot send emails.")

        email_sending_possible = False
    else:
        logger.info("Email configuration initialized successfully.")
        email_sending_possible = True

except ValueError as e:
    logger.error(f"Error initializing EmailConfig: {e}")
    email_sending_possible = False # Cannot send emails if initialization fails


#Generate the Weather Data
weather_data = Get_Weather_Data.get_weather(uniq_zipcodes)
human_readable_data = Get_Weather_Data.human_readable_weather(weather_data)

# Check if weather_data is successfully retrieved
if not weather_data:
    logger.error("Failed to retrieve weather data. Exiting.")
    exit()
else:
    logger.info("Weather data retrieved successfully.")


#generate the messages
messages = {} # Store messages as a dictionary with user as key
logger.info("Generating messages for users.")
for user in user_list:
    message = Get_Weather_Data.generate_message(user, human_readable_data)
    messages[user] = message # Associate to user

logger.info("Finished generating messages.")

#send the messages
if email_sending_possible:
    logger.info("Attempting to send messages.")
    try:
        # Establish SSL context and login
        smtp_server = email_config.smtp_server
        smtp_port = email_config.smtp_port
        sender_email = email_config.sender_email
        sender_password = email_config.sender_password

        # Define carrier_sms_protocols
        #Can move to config ayaml eventually
        carrier_sms_protocol = {
            "AT&T": "txt.att.net",
            "Boost Mobile": "sms.myboostmobile.com",
            "Cricket": "sms.mycricket.com",
            "Verizon": "vtext.com",
            "T-Mobile": "tmomail.net"
        }


        with smtplib.SMTP_SSL(smtp_server, smtp_port) as ssl_context:
            logger.info("SMTP SSL context established.")
            ssl_context.login(sender_email, sender_password)
            logger.info("SMTP login successful.")

            for user, message_body in messages.items():
                # Check if message_body is not empty
                if message_body and message_body.strip() != f"Hello {user.name},\n\nHere is your weather forecast for the next 3 days:\n\n":
                    logger.info(f"Sending message to {user.name}")
                    # Call send_email
                    Send_Email_SMS.send_email(ssl_context, sender_email, user.ph_nmbr, user.carrier, "Weather Update", message_body, carrier_sms_protocol)
                else:
                    logger.warning(f"Skipping sending message to {user.name} due to empty or incomplete message.")

    except Exception as e:
        logger.error(f"An error occurred during the email sending process: {e}")
else:
    logger.warning("Skipping email sending due to configuration errors.")

logger.info("Application finished.")

Writing __main__.py


# weather_config.yml

email_settings:
  smtp_server: your_smtp_server
  smtp_port: 587 # or your SMTP port
  sender_email: your_email@example.com
  sender_password: your_email_password

users:
  - name: User One
    ph_nmbr: '1234567890' # Phone number as a string
    carrier: Verizon # e.g., AT&T, Boost Mobile, Cricket, Verizon, T-Mobile
    zipcodes:
      - '10001'
      - '90210'
  - name: User Two
    ph_nmbr: '0987654321'
    carrier: T-Mobile
    zipcodes:
      - '60601'
      - '75001'

sms_settings:
  "AT&T": ""

# Logger.py

In [2]:
%%writefile Logger.py
import logging
from logging.handlers import TimedRotatingFileHandler
import os

# Create a logs directory if it doesn't exist
log_dir = "logs"
if not os.path.exists(log_dir):
    os.makedirs(log_dir)

# Configure the logger
logger = logging.getLogger("weather_app")
logger.setLevel(logging.INFO)  # Set the minimum logging level

# Create a file handler with timed rotation
log_file = os.path.join(log_dir, "weather_app.log")
file_handler = TimedRotatingFileHandler(
    log_file, when="W0", interval=1, backupCount=5
)
# when="W0" means rotate every Monday (W0 to W6 for Sunday to Saturday)
# interval=1 means rotate every 1 week
# backupCount=5 means keep 5 weeks of backup logs

# Create a formatter and add it to the handler
formatter = logging.Formatter("%(asctime)s - %(name)s - %(levelname)s - %(message)s")
file_handler.setFormatter(formatter)

# Add the file handler to the logger
logger.addHandler(file_handler)

# Optionally, add a stream handler to output logs to the console
stream_handler = logging.StreamHandler()
stream_handler.setFormatter(formatter)
logger.addHandler(stream_handler)

logger.info("Logger configured.")

Writing Logger.py
