<h2> Importing libaries </h2>

In [21]:
import requests
from No_sync.credentials import config 
import requests
from pprint import pprint
from authentication import get_token
import json
import pandas as pd
import numpy as np
import matplotlib.pyplot as plt

<h2> Handling API requests and data extraction</h2>

In [22]:
def get_token():
  """
  Get an access token from the API
  """

  if not config['client_id']:
    raise ValueError('client_id must be set in credentials.py')

  if not config['client_secret']:
    raise ValueError('client_secret must be set in credentials.py')

  req = requests.post(config['token_url'],
    data={
        'grant_type': 'client_credentials',
        'client_id': config['client_id'],
        'client_secret': config['client_secret'],
        'scope': 'api'
    },
    headers={'content-type': 'application/x-www-form-urlencoded'})

  req.raise_for_status()
  print('Token request successful')
  return req.json()

In [23]:
def get_week_summary(token, year, week):
  """
  Get the weekly summary for a given year and week
  """
  url = f"{config['api_base_url']}/v1/geodata/fishhealth/locality/{year}/{week}"
  headers ={
    'authorization': 'Bearer ' + token['access_token'],
    'content-type': 'application/json',
  }

  response = requests.get(url, headers=headers)
  response.raise_for_status()
  return response.json()

In [24]:
def make_sample_request(token):
  """
  Make a sample request to the API
  """
  url = f"{config['api_base_url']}/v1/sample/auth"
  headers ={
    'authorization': 'Bearer ' + token['access_token'],
    'content-type': 'application/json',
  }

  response = requests.get(url, headers=headers)
  response.raise_for_status()
  return response.json()


In [25]:
#request a token
print(f"Requesting token from {config['token_url']}, using client_id {config['client_id']}.")
token = get_token()


Requesting token from https://id.barentswatch.no/connect/token, using client_id dinussen.sivarasalingam@nmbu.no:Fishdata.
Token request successful


In [26]:
# test token
response = make_sample_request(token)
print('Request to the api was successful - authentication worked.')
pprint(response)

Request to the api was successful - authentication worked.
{'id': 1594208293,
 'name': 'https://www.barentswatch.no',
 'updated': '2023-10-01T15:15:46.1386876+02:00'}


<h2> Data transformation </h2>
<h3> Getting year worth of data </h3>

In [27]:
import pandas as pd
from datetime import datetime, timedelta

def weeks_of_the_year():
    """
    Create a DataFrame with the week number and year for each week of the year
    """
    # Get the current year and the week of the year
    today = datetime.now()
    year = today.year
    day_of_year = today.strftime('%j')
    week_of_year = (int(day_of_year) - 1) // 7 + 1
    
    # Calculate the number of weeks left in the previous year
    weeks_left_last_year = 52 - week_of_year
    
    # Create DataFrames for the current year and the remaining weeks from the previous year
    week_df = pd.DataFrame({'Week': range(1, week_of_year + 1), 'Year': [year] * week_of_year})
    week_df_last = pd.DataFrame({'Week': range(1, weeks_left_last_year + 1), 'Year': [year - 1] * weeks_left_last_year})

    # Concatenate the two DataFrames
    week_df = pd.concat([week_df_last, week_df], ignore_index=True)
    # add week_of_year to week where year is current year - 1
    week_df.loc[week_df['Year'] == year - 1, 'Week'] += week_of_year

    return week_df

# Example usage:
weeks_df = weeks_of_the_year()

In [28]:
def make_df(weeksummary):
    """
    Create a DataFrame from the week summary
    """
    df_data = pd.DataFrame()
    #print(weeksummary)
    for element in weeksummary:
        if element == 'localities':
            lines = []
            for information in weeksummary['localities']:
                line = []
                for key, value in information.items():
                    #print(key, value)
                    df_data[key] = value
                    line.append(value)
                df_data['year'] = (weeksummary['year'])
                line.append(weeksummary['year'])
                df_data['week'] = weeksummary['week']
                line.append(weeksummary['week'])
                #print(line)
            
                #add line to df_data as row
                lines.append(line)
    
    df_data = pd.DataFrame(lines, columns = df_data.columns)
    #print(df_data)
    return df_data

In [29]:
def get_year_data(weeks_df):
    """
    This function returns a dataframe with all the data from the weeks in weeks_df
    """
    teller = 0
    df_data = pd.DataFrame()
    for week, year in weeks_df.values:
        teller +=1
        #print(week, year)
        weeksummary= get_week_summary(token,year,week)
        #print(weeksummary)
        df_to_concat = make_df(weeksummary)
        #print(df_to_concat)
        frames = [df_data, df_to_concat]
        df_data = pd.concat(frames, ignore_index=True)
        #print(teller)
    return df_data

In [30]:
summary_oneyear = get_year_data(weeks_df)
summary_oneyear.shape

(89774, 21)

In [43]:
summary_oneyear.to_csv('Data/summary_oneyear.csv', index=False)


<h3> Get lice count from one locality </h3>

In [35]:

def get_lice_counts(weeks_df, locality = 10362):      
    """
    This function returns a dataframe with the number of lice counts for each week in weeks_df for one locality

    """

    lice_counts = pd.DataFrame()
    for week, year in weeks_df.values:
        weeksummary= get_week_summary(token,year,week)
        df_to_concat = make_df(weeksummary)
        df_to_concat = df_to_concat[df_to_concat['localityNo'] == locality]
        lice_counts = pd.concat([lice_counts, df_to_concat], ignore_index=True)
    return lice_counts



 


lice_counts = get_lice_counts(weeks_df)



In [36]:
lice_counts

Unnamed: 0,localityNo,localityWeekId,name,hasReportedLice,isFallow,avgAdultFemaleLice,hasCleanerfishDeployed,hasMechanicalRemoval,hasSubstanceTreatments,hasPd,...,municipalityNo,municipality,lat,lon,isOnLand,inFilteredSelection,hasSalmonoids,isSlaughterHoldingCage,year,week
0,10362,1465267,Nmbu Fiskelaboratoriet,False,True,,False,False,False,False,...,3021,Ås,59.669233,10.757967,True,True,True,False,2022,41
1,10362,1467823,Nmbu Fiskelaboratoriet,False,True,,False,False,False,False,...,3021,Ås,59.669233,10.757967,True,True,True,False,2022,42
2,10362,1470379,Nmbu Fiskelaboratoriet,False,True,,False,False,False,False,...,3021,Ås,59.669233,10.757967,True,True,True,False,2022,43
3,10362,1474628,Nmbu Fiskelaboratoriet,False,True,,False,False,False,False,...,3021,Ås,59.669233,10.757967,True,True,True,False,2022,44
4,10362,1477753,Nmbu Fiskelaboratoriet,False,True,,False,False,False,False,...,3021,Ås,59.669233,10.757967,True,True,True,False,2022,45
5,10362,1492638,Nmbu Fiskelaboratoriet,False,True,,False,False,False,False,...,3021,Ås,59.669233,10.757967,True,True,True,False,2022,46
6,10362,1495268,Nmbu Fiskelaboratoriet,False,True,,False,False,False,False,...,3021,Ås,59.669233,10.757967,True,True,True,False,2022,47
7,10362,1498468,Nmbu Fiskelaboratoriet,False,True,,False,False,False,False,...,3021,Ås,59.669233,10.757967,True,True,True,False,2022,48
8,10362,1501099,Nmbu Fiskelaboratoriet,False,True,,False,False,False,False,...,3021,Ås,59.669233,10.757967,True,True,True,False,2022,49
9,10362,1504302,Nmbu Fiskelaboratoriet,False,True,,False,False,False,False,...,3021,Ås,59.669233,10.757967,True,True,True,False,2022,50


<h2> Connecting to Cassandra cluster and data loading</h2>

In [37]:
# Connecting to Cassandra
from cassandra.cluster import Cluster
cluster = Cluster(['localhost'], port=9042)
session = cluster.connect()

In [38]:
session.execute("CREATE KEYSPACE IF NOT EXISTS fish_keyspace WITH REPLICATION = { 'class' : 'SimpleStrategy', 'replication_factor' : 1 };")

<cassandra.cluster.ResultSet at 0x1fb108eb0d0>

In [39]:
# Create a new table (first time only)
session.set_keyspace('fish_keyspace')
session.execute("DROP TABLE IF EXISTS fish_keyspace.fish_table;") # Starting from scratch every time

session.execute("CREATE TABLE IF NOT EXISTS my_first_table (ind int PRIMARY KEY, company text, model text);")

<cassandra.cluster.ResultSet at 0x1fb10882d50>

In [40]:
# Create a new table (first time only)
session.set_keyspace('fish_keyspace')
session.execute("DROP TABLE IF EXISTS fish_keyspace.fish_table;") # Starting from scratch every time

session.execute("CREATE TABLE IF NOT EXISTS my_first_table (ind int PRIMARY KEY, company text, model text);")

<cassandra.cluster.ResultSet at 0x1fb10880310>

In [41]:
import os
os.environ["JAVA_HOME"] = "C:\Program Files\Java\jdk-20"
# If you are using environments in Python, you can set the environment variables like this:
os.environ["PYSPARK_PYTHON"] = "python" # or similar to "/Users/kristian/miniforge3/envs/tf_M1/bin/python"
#os.environ["PYSPARK_DRIVER_PYTHON"] = "python" # or similar to "/Users/kristian/miniforge3/envs/tf_M1/bin/python"
# Set the Hadoop version to the one you are using, e.g., none:
#os.environ["PYSPARK_HADOOP_VERSION"] = "without"

In [42]:
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName('SparkCassandraApp').\
    config('spark.jars.packages', 'com.datastax.spark:spark-cassandra-connector_2.12:3.4.1').\
    config('spark.cassandra.connection.host', 'localhost').\
    config('spark.sql.extensions', 'com.datastax.spark.connector.CassandraSparkExtensions').\
    config('spark.sql.catalog.mycatalog', 'com.datastax.spark.connector.datasource.CassandraCatalog').\
    config('spark.cassandra.connection.port', '9042').getOrCreate()
# Some warnings are to be expected.