# Step 1: Producing data

### From CSV to JSON

Again, a Notebook is not the best tool of choice for this assignment. You should use tools for what they are good at. Though you could use a screw driver to hammer in nails, a hammer is a better tool for the job.

In [1]:
import json  # To convert dictionaries to JSON

class CsvConverter:
    """This class converts CSV data to JSON."""

    def __init__(self, keys_str):
        # The __init__ method runs when the class is instantiated.
        # Here we're setting the keys of our CSV data by splitting
        # the first line of the CSV on commas.

        # One could argue that the reading of the first line of the csv is
        # the responsibility of the converter 🤔.
        self.keys = keys_str.split(',')
        
    def csv_to_json(self, csv_lines):
        # This method takes a list of CSV lines (strings)
        # and converts them to JSON.
        json_data = []  # We'll store our JSON data here
        
        for line in csv_lines:  # We loop over the CSV lines
            values = line.split(',')  # We split each line into values
            
            # We check if the number of keys matches the number of values
            # If not, we print a warning and skip this line
            # This is actually a good design decision.
            if len(self.keys) != len(values):
                print("Warning: numbers of keys and values don't match in line:", line)
                continue
            
            # We create dictionary from keys and values using the zip function
            data_dict = dict(zip(self.keys, values))
            
            # We convert dictionary to a JSON string and add it to our list
            json_data.append(json.dumps(data_dict))
            
        return json_data  # We return the JSON data
    


### Getting the data

In [2]:
import linecache

class Reader:
    ''' This class reads a CSV file and returns a JSON string.'''
    def __init__(self, csv_file, stride):
        self.csv_file = csv_file
        self.stride = stride
        self.current_line = 2  # Start from line 2, assuming line 1 is the header
        self.converter = CsvConverter(linecache.getline(csv_file, 1).strip())

    def get_lines(self):
        lines = []
        for _ in range(self.stride):
            line = linecache.getline(self.csv_file, self.current_line)
            if line:
                lines.append(line.strip())
                self.current_line += 1
            else:
                break

        if not lines:
            return ""

        return self.converter.csv_to_json(lines)


Create instances of Reader and use the get_lines method to retrieve JSON data in strides

In [3]:
# create instances of Reader and use the get_lines method
# to retrieve JSON data in strides of 5 lines

reader = Reader('dSST.csv', stride=5)
print(reader.get_lines())  # Returns lines 2-6 as JSON
print(reader.get_lines())  # Returns lines 7-11 as JSON
print(reader.get_lines())  # Returns lines 12-16 as JSON

['{"Year": "1881", "Jan": "-.18", "Feb": "-.13", "Mar": ".04", "Apr": ".06", "May": ".07", "Jun": "-.17", "Jul": ".02", "Aug": "-.02", "Sep": "-.14", "Oct": "-.21", "Nov": "-.17", "Dec": "-.06", "J-D": "-.07", "D-N": "-.08", "DJF": "-.16", "MAM": ".06", "JJA": "-.06", "SON": "-.17"}', '{"Year": "1882", "Jan": ".17", "Feb": ".15", "Mar": ".05", "Apr": "-.16", "May": "-.13", "Jun": "-.22", "Jul": "-.15", "Aug": "-.06", "Sep": "-.13", "Oct": "-.23", "Nov": "-.15", "Dec": "-.36", "J-D": "-.10", "D-N": "-.08", "DJF": ".09", "MAM": "-.08", "JJA": "-.15", "SON": "-.17"}', '{"Year": "1883", "Jan": "-.28", "Feb": "-.36", "Mar": "-.12", "Apr": "-.18", "May": "-.17", "Jun": "-.06", "Jul": "-.06", "Aug": "-.13", "Sep": "-.21", "Oct": "-.11", "Nov": "-.23", "Dec": "-.11", "J-D": "-.17", "D-N": "-.19", "DJF": "-.33", "MAM": "-.15", "JJA": "-.09", "SON": "-.19"}', '{"Year": "1884", "Jan": "-.12", "Feb": "-.07", "Mar": "-.36", "Apr": "-.39", "May": "-.33", "Jun": "-.34", "Jul": "-.32", "Aug": "-.27", 

___________________

___________

# Step 2: Consuming the data

In [8]:
# There's no import of pandas in the cells above...
class AverageYear:
    '''This class calculates the average temperature anomaly per year.'''
    def __init__(self, reader):
        self.reader = reader 
        self.dataframe = pd.DataFrame()

    def calculate_avg(self):
        '''This method calculates the average temperature anomaly per year.'''

        while True: 
            # Interesting; how can this work. Both your AverageYear and you AverageMonth
            # hang the main thread. Also, since they both propagate the state of the 
            # Reader-objects, there will be a mismatch between the expected and received
            # data.
            lines = self.reader.get_lines()
            if not lines:
                break
            
            data = [json.loads(line) for line in lines]
            df = pd.DataFrame(data)
            
            df.set_index('Year', inplace=True)
            df = df.apply(pd.to_numeric, errors='coerce')  # Convert all columns to numeric, setting non-numeric values to NaN
            
            df['Average'] = df.mean(axis=1, numeric_only=True)  # calculate mean only for numeric columns
            
            self.dataframe = pd.concat([self.dataframe, df])  # Use pd.concat instead of deprecated .append
            
        print(self.dataframe['Average'])


class AverageMonth:
    ''' This class calculates the average temperature anomaly per month.'''
    def __init__(self, reader):
        self.reader = reader
        self.dataframe = pd.DataFrame()

    def calculate_avg(self):
        '''Calculates the average temperature anomaly per month.'''
        while True:
            lines = self.reader.get_lines()
            if not lines:
                break
            
            data = [json.loads(line) for line in lines]
            df = pd.DataFrame(data)
            
            df.set_index('Year', inplace=True)
            df = df.apply(pd.to_numeric, errors='coerce')  # Convert all columns to numeric, setting non-numeric values to NaN
            
            df = df.transpose()  
            
            df['Average'] = df.mean(axis=1, numeric_only=True)  # calculate mean only for numeric columns
            
            self.dataframe = pd.concat([self.dataframe, df])
        
        print(self.dataframe['Average'])



In [9]:

reader = Reader('dSST.csv', stride=5)  # stride is provided here
average_year = AverageYear(reader)
average_year.calculate_avg()

reader = Reader('dSST.csv', stride=5)  # stride is provided here
average_month = AverageMonth(reader)
average_month.calculate_avg()



Year
1881   -0.076111
1882   -0.095000
1883   -0.174444
1884   -0.273333
1885   -0.332778
          ...   
2017    0.921667
2018    0.851111
2019    0.973333
2020    1.026111
2021    0.848889
Name: Average, Length: 141, dtype: float64
Jan   -0.198
Feb   -0.148
Mar   -0.130
Apr   -0.216
May   -0.202
       ...  
D-N    0.850
DJF    0.760
MAM    0.810
JJA    0.860
SON    0.950
Name: Average, Length: 522, dtype: float64


### Extending the reade

For extending the Reader class to include methods for adding, removing, and notifying observers

We can edit the __init__ method.

we create an empty list to hold the observers.

We'll also update the get_lines method.

In [11]:
import time

class Reader:
    ''' This class reads a CSV file and returns a JSON string.'''
    def __init__(self, csv_file, stride):
        self.csv_file = csv_file
        self.stride = stride
        self.current_line = 2  # Start from line 2, assuming line 1 is the header
        self.converter = CsvConverter(linecache.getline(csv_file, 1).strip())
        self.observers = []

    def add_observer(self, observer):
        '''This method adds an observer to the list of observers.'''
        if observer not in self.observers:
            self.observers.append(observer)

    def remove_observer(self, observer):
        '''This method removes an observer from the list of observers.'''
        if observer in self.observers:
            self.observers.remove(observer)

    def notify_observers(self, lines):
        '''This method notifies all observers of a change.'''
        for observer in self.observers:
            observer.update(lines)

    def get_lines(self):
        '''This method reads a number of lines from the CSV file and returns a JSON string.'''
        lines = []
        for _ in range(self.stride):
            line = linecache.getline(self.csv_file, self.current_line)
            if line:
                lines.append(line.strip())
                self.current_line += 1
            else:
                break
        if lines:
            self.notify_observers(lines)
        time.sleep(5)


### Extending the consumers

For updating the AverageYear and AverageMonth classes to include an update method.
This method takes the lines as input and performs the same processing as before.

In [12]:
class AverageYear:
    '''This class calculates the average temperature anomaly per year.'''
    def __init__(self, reader):
        self.reader = reader
        self.dataframe = pd.DataFrame()

    def update(self, lines):
        '''This method calculates the average temperature anomaly per year.'''
        data = [json.loads(line) for line in lines]
        df = pd.DataFrame(data)
        df.set_index('Year', inplace=True)
        df['Average'] = df.iloc[:, 0:12].mean(axis=1)
        self.dataframe = pd.concat([self.dataframe, df])
        print(self.dataframe['Average'])

class AverageMonth:
    ''' This class calculates the average temperature anomaly per month.'''
    def __init__(self, reader):
        self.reader = reader
        self.dataframe = pd.DataFrame()

    def update(self, lines):
        data = [json.loads(line) for line in lines]
        df = pd.DataFrame(data)
        df.set_index('Year', inplace=True)
        df = df.iloc[:, 0:12].transpose()
        df['Average'] = df.mean(axis=1)
        self.dataframe = pd.concat([self.dataframe, df])
        print(self.dataframe['Average'])


In [13]:
reader = Reader('dSST.csv', stride=5)
average_year = AverageYear(reader)
average_month = AverageMonth(reader)
reader.add_observer(average_year)
reader.add_observer(average_month)

With this setup, every time Reader.get_lines retrieves new lines, it notifies the observers, which update their calculations and print the new averages.

Good; however, I would have liked to see some output here as well 😎.