In [1]:
# Import Modules
from pyspark.sql.types import *
from pyspark.sql import functions as F
from pyspark.sql.types import LongType, StringType, StructField, StructType, BooleanType, ArrayType, IntegerType, TimestampType
from pyspark.sql.functions import coalesce, lit, col, lead, lag
from pyspark.sql.functions import stddev, mean, col
from pyspark.sql.window import Window

from operator import add
from functools import reduce

from googletrans import Translator

# Standard Python Modules
import matplotlib.pyplot as plt
import seaborn as sns
import matplotlib.dates as mdates
import pandas as pd
import numpy as np
import re

In [2]:
class DataframeTools:
  """Parent class for manipulating spark dataframes. """
  
  def __init__(self, df):
    self.df = df
  
  
  def update_df(self, new_df):
    """Replace class dataframe with a new one.
    
    Args:
      new_df (dataframe): New dataframe
      
    """
    
    self.df = new_df
  
  
  def append_data(self, new_df):
    """Append another dataframe below the current one. 
    New dataframe must have the same columns as the original dataframe.
    
    Args:
      new_df (dataframe): New dataframe
      
    """
    
    assert self.df.schema == new_df.schema, "Column headers must match!"
    
    print("Current samples: ", self.df.count())
    print("Appending samples: ", new_df.count())
    
    self.df = self.df.union(new_df)
    print("Joined samples: ", self.df.count())
    print("")
  
  
  def is_null(self, dfs):
    """Check all columns in dataframe for null values.
    
    Args:
      dfs (dataframe): Dataframe to be checked on
      
    """
    
    print("Number of samples with a null value across columns:")
    for col in dfs.schema:
      head = col.name
      #print(dfs[head].isNull() == True)
      print(head, dfs.where(dfs[head].isNull() == True).count())
    print("")
    
    
  def null2zero(self, head, dfs):
    """Change null values to zero in column
    
    Args:
      head (str): Column name
      dfs (dataframe): Dataframe given
      
    """
    
    print("Replacing null values with zero...\n")
    dfs = dfs.na.fill(0, (head))
    return dfs



In [3]:
class GroupDataTools(DataframeTools):
  """Subclass of Dataframe tools to reorganise dataframes into dictionaries.
    Tools for visualisation and preprocessing included.
  """
  
  def __init__(self, df, df_dict={}):
    super().__init__(df)
    self.df_dict = df_dict
    self.headers = [h.name for h in df.schema]
    
    
  def groupdata(self, dict_group_head, x_head, y_head):
    """Collect rows which contain the same value in dict_group_head column and 
    put them into a dictionary. 
    
    Keys: distinct value, Value: dataframe whose rows contain distinct value
    
    Args:
      dict_group_head (str): column to split the dataframe
      x_head (str): first column (normally time) to reconstruct corresponding dataframe
      y_head (str): second column (normally value) to reconstruct corresponding dataframe
    
    """
    
    # Split dataframe up by given header
    assert dict_group_head in self.headers, "Header does not exist in dataframe!"
    
    unq_items = self.df.select(dict_group_head).distinct()
    n = unq_items.rdd.map(lambda x: x[dict_group_head]).collect()

    for key in n:
      self.df_dict[key] = self.df.orderBy(x_head).where(self.df[dict_group_head] == key).select(self.df[x_head], self.df[y_head])
  
  
  def splitdata_dict(self, regex):
    """Make a dictionary from df_dict given a conditional substring which matches within the keys of df_dict.
    E.g. Return a dictionary of a certain oilwell given from a code in the tag name.

    Args:
      regex (str): substring to match with keys, written in wildcard format for regular expressions
      
    """
    
    out_dict = {}
    
    for (key, val) in self.df_dict.items():
      for reg in regex:
        if re.match(reg, key):
          out_dict[key] = val
      
    return out_dict
    
    
  def decode_keys(self, in_dict, decode_dict):
    """Replace the old keys with new key definitions.

      Args:
        in_dict (dict): input dictionary 
        decode_dict (dict): dictionary of old keys with new definitions

      Returns: None
      
    """

    for key, val in in_dict.items():
      for k, new_key in decode_dict.items():
        regex = re.compile(k)
        if (re.match(regex, key)):
          print("Replacing ", key, ", with ", new_key)
          in_dict[new_key] = in_dict.pop(key)
    print("")
    
    return None
  
  
  def plot_ts(self, title, x_head, y_head, ts_df_list, label_list=["value"]):
    """Plot multiple timeseries dataframe onto a figure, x axis = time, y axis = value.

    Args:
      title (str): Name of dataframe
      x_head (str): Name of column to be plotted along x axis
      y_head (str): Name of column to be plotted along y axis
      ts_df_list (list): list of timeseries dataframes to plot
      label_list (list): list of plot labels

    """
    
    fig, ax = plt.subplots(1, 1, figsize=(24, 10))

    for ts_df, lab in zip(ts_df_list, label_list):
      ts_pd = ts_df.orderBy(x_head).toPandas()

      y = ts_pd[y_head].tolist()
      x = ts_pd[x_head].tolist()

      ax.plot(x, y, ".--", label=lab)

    ax.set_title(title, fontsize=16)
    ax.set_xlabel(x_head, fontsize=16)
    ax.set_ylabel(y_head, fontsize=16)

    ax.xaxis.set_major_formatter(mdates.DateFormatter("%Y-%m-%d %H:%M"))
    ax.xaxis.set_minor_formatter(mdates.DateFormatter("%Y-%m-%d"))

    ax.legend(loc="best")

    fig.tight_layout()
    display(fig)
    

  def weighted_average(ts_df, x_head, y_head, offsets, weights):
    """Produce rolling average results of the given ts data with the given specs.

      Args:
        ts_df (dataframe): timeseries dataframe
        x_head (str): header name of x axis in timeseries (e.g. datetime)
        y_head (str): header name of y axis in timeseries (e.g. value)
        offsets (list): list of adjacent values to consider
        weights (list): list of weights applied to offsets

    """
    window = Window.orderBy(x_head)
    v = col(y_head)

    assert len(weights) == len(offsets)

    def value(i):
        if i < 0: return lag(v, -i).over(window)
        if i > 0: return lead(v, i).over(window)
        return v

    values = [coalesce(value(i) * w, lit(0))/len(offsets) for i, w in zip(offsets, weights)]

    return reduce(add, values, lit(0))
  
  
  def view_moving_avg(title, x_head, y_head, y_label, ts_df, offsets, weights):
    """Wrapper function to view the moving average of the given ts data

      Args:
        title (str): Title of graph
        x_head (str): Header name for x column data
        y_head (str): Header name for y column data
        y_label (str): y axis label
        ts_df (dataframe): Time series dataframe
        offsets (list): list of adjacent values to consider
        weights (list): list of weights applied to offsets

    """

    avg = ts_df.withColumn("avg", weighted_average(ts_df, offsets, weights)).drop(y_head)
    avg = avg.select(avg[x_head],
                     avg["avg"].alias(y_head))

    plot_ts(title, y_label, [avg])
    
    

In [4]:
# Load data into the notebook
df_01 = spark.read.format('csv').options(header='true', inferSchema='true').load('/FileStore/tables/newdump_01.csv')

# Rename and cast types for each column
df_01 = df_01.select(
      df_01["Unnamed: 0"].alias("index"),
      F.to_timestamp(F.col("ts").cast("string"), "dd-MMM-yy HH:mm:ss").alias("datetime"),
      df_01["name"].alias("tag"),
      df_01["value"]
)

df_02 = spark.read.format('csv').options(header='true', inferSchema='true').load('/FileStore/tables/newdump_02.csv')

df_02 = df_02.select(
      df_02["Unnamed: 0"].alias("index"),
      F.to_timestamp(F.col("ts").cast("string"), "dd-MMM-yy HH:mm:ss").alias("datetime"),
      df_02["name"].alias("tag"),
      df_02["value"]
)

display(df_02.select("tag").distinct())

In [5]:
explore = GroupDataTools(df_01)
explore.append_data(df_02)
explore.is_null(explore.df)
explore.df = explore.null2zero("value", explore.df)
explore.is_null(explore.df)

In [6]:
explore.groupdata("tag", "datetime", "value")

r1 = re.compile('BRA-....-..-07.')
r2 = re.compile('BRA-QT  -15-0077-RAW')
OW1 = explore.splitdata_dict([r1, r2])

r1 = re.compile('BRA-....-..-01.')
r2 = re.compile('BRA-QT  -15-0017-RAW')
OW3 = explore.splitdata_dict([r1, r2])

r1 = re.compile('BRA-....-..-04.')
OW2 = explore.splitdata_dict([r1])

# Make a tag dictionary: Decode the tags!
tag_names = {
              "BRA-PZT........" : "WHP",
              "BRA-TT  -15...." : "WHT",
              "BRA-FI........." : "GLR",
              "BRA-PT  -16...." : "GLP",
              "BRA-PT  -13...." : "DHP",
              "BRA-TT  -13...." : "DHT",
              "BRA-HV........." : "Choke",
              "BRA-QT........." : "ASD"
}

explore.decode_keys(OW1, tag_names)
explore.decode_keys(OW3, tag_names)
explore.decode_keys(OW2, tag_names)


In [7]:
ts_dfs = [OW1["DHP"], OW1["WHP"], OW1["DHT"], OW1["WHT"], OW1["GLP"]]
ts_labels = ["DHP", "WHP", "DHT", "WHT", "GLP"]

ge2016 = [df.where(df.datetime >= '2016-01-01') for df in ts_dfs]

explore.plot_ts("WHP, WHT, DHP, DHT, GLP over time", "datetime", "value", ge2016, ts_labels)

In [8]:
ts_dfs = [OW1["DHP"], OW1["WHP"], OW1["GLP"]]
ts_labels = ["DHP", "WHP", "GLP"]

ge2016p = [df.where(df.datetime >= '2016-01-01') for df in ts_dfs]

explore.plot_ts("WHP, DHP, GLP over time", "datetime", "value", ge2016p, ts_labels)

In [9]:
ts_dfs = [OW1["DHT"], OW1["WHT"]]
ts_labels = ["DHT", "WHT"]

ge2016t = [df.where(df.datetime >= '2016-01-01') for df in ts_dfs]

explore.plot_ts("WHT, DHT over time", "datetime", "value", ge2016t, ts_labels)

In [10]:
# Load new data containing rates into the notebook
OW1_ql_df = spark.read.format('csv').options(header='true', inferSchema='true').load('/FileStore/tables/Qliq.csv')

# Add the relevant columns to the oilwell dictionary
OW1["LR"] = OW1_ql_df.select(
                                   F.to_timestamp(F.col("DATE").cast("string"), "MM/dd/yyyy").alias("datetime"),
                                    OW1_ql_df["Daily liquid rate [Sm3/d]"].alias("value")
)

OW1["OR"] = OW1_ql_df.select(
                                   F.to_timestamp(F.col("DATE").cast("string"), "MM/dd/yyyy").alias("datetime"),
                                    OW1_ql_df["Daily oil [Sm3/d]"].alias("value")
)

# - Compare if WHP, DHP, GLR are the same as the dump ones!

In [11]:
# Try to find the interventions thats occurred over the years
ts_dfs = [OW1["DHP"], OW1["WHP"], OW1["GLP"], OW1["OR"], OW1["LR"]]
ts_labels = ["DHP", "WHP", "GLP", "OR", "LR"]

ge2016norm = [df.where(df.datetime >= '2016-01-11') for df in ts_dfs]

for i, df in enumerate(ge2016norm):
  mean, std = df.select(F.mean("value"), F.stddev("value")).first()
  ge2016norm[i] = df.withColumn("value_norm", (col("value") - mean) / std)
  ge2016norm[i] = ge2016norm[i].select(ge2016norm[i]["datetime"], ge2016norm[i]["value_norm"].alias("value"))

explore.plot_ts("Normalised WHP, DHP, GLP, OR, LR over time", "datetime", "value", ge2016norm, ts_labels)

In [12]:
# Try to find the interventions thats occurred over the years
ts_dfs = [OW1["DHP"], OW1["WHP"], OW1["LR"]]
ts_labels = ["DHP", "WHP", "LR"]

ge2016norm = [df.where(df.datetime >= '2016-01-11') for df in ts_dfs]

for i, df in enumerate(ge2016norm):
  mean, std = df.select(F.mean("value"), F.stddev("value")).first()
  ge2016norm[i] = df.withColumn("value_norm", (col("value") - mean) / std)
  ge2016norm[i] = ge2016norm[i].select(ge2016norm[i]["datetime"], ge2016norm[i]["value_norm"].alias("value"))

explore.plot_ts("Normalised WHP, DHP, LR over time", "datetime", "value", ge2016norm, ts_labels)

In [13]:
# Try to find the interventions thats occurred over the years
ts_dfs = [OW1["DHP"], OW1["WHP"], OW1["LR"]]
ts_labels = ["DHP", "WHP", "LR"]

ge2016norm = [df.where(df.datetime >= '2016-01-11') for df in ts_dfs]

for i, df in enumerate(ge2016norm):
  mean, std = df.select(F.mean("value"), F.stddev("value")).first()
  ge2016norm[i] = df.withColumn("value_norm", (col("value")) / std)
  ge2016norm[i] = ge2016norm[i].select(ge2016norm[i]["datetime"], ge2016norm[i]["value_norm"].alias("value"))

explore.plot_ts("Standardised WHP, DHP, LR over time", "datetime", "value", ge2016norm, ts_labels)

In [14]:
# offsets, weights =  [-5, -4, -3, -2, -1, 0, 1, 2, 3, 4, 5], [1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1]
# view_moving_avg("Moving average of OW1 WHP", "WHP (Bar)", OW1["GLR"], offsets, weights)

In [15]:
# Import in the data:
interference_df = spark.read.format('csv').options(header='true', inferSchema='true', delimiter='|', encoding='iso-8859-1').load('/FileStore/tables/interences_filtered.csv')

# Translate descriptions
translator = Translator()
n = interference_df.select(interference_df["Description"]).distinct().rdd.map(lambda x: x["Description"]).collect()
no2eng_dict = {desc : translator.translate(desc, src='no', dest='en').text for desc in n}

interference_df = interference_df.na.replace(no2eng_dict, 1, "Description")

# Pick out only date and description columns
OW1["Interf"] = interference_df.select(
                 F.to_timestamp(F.col("Date").cast("string"), "MM/dd/yyyy").alias("datetime"),
                 interference_df["Description"]
)

# def translate_string(sentence):
#   translator = Translator()
#   english = translator.translate(sentence, src='no', dest='en').text  
#   return english
# udf = F.udf(translate_string, StringType())
# int_df = interference_df.select(interference_df["Date"], interference_df["Description"])
# int_df = int_df.withColumn("Desc", udf("Description"))
# display(int_df)

In [16]:
df = OW1["WHP"].where(OW1["WHP"].datetime >= '2017-01-11')

df = df.select(
  df["datetime"].alias("datetime_orig"),
  (F.round(F.unix_timestamp(F.col("datetime")) / 86400) * 86400).cast("timestamp").alias("datetime"),
  df["value"]
)

new_df = df.join(OW1["Interf"], on=['datetime'], how='left_outer')

# Convert distinct comments into numbers
numerate = { val : str(i) for i, val in enumerate(no2eng_dict.values())}

new_df = new_df.na.replace(numerate, 1, "Description")
new_df = new_df.select(
                new_df["datetime_orig"].alias("datetime"),
                new_df["value"],
                new_df["Description"].cast(IntegerType())
)

new_df = new_df.na.fill(0, ("Description"))

ts_pd = new_df.toPandas()

#set pretty params
sns.set()


sns.lmplot('datetime', 'value', data=ts_pd, hue='Description', fit_reg=False)


# groups = ts_pd.groupby('Description')

# print(groups)

# # Plot
# fig, axs = plt.subplots(1, 4, figsize=(24, 10))
# #axs.margins(0.05) # Optional, just adds 5% padding to the autoscaling
# axs.flatten()

# for ax in axs:
#   g
#   for name, group in groups:
#       ax.plot(group.datetime, group.value, marker='o', linestyle='', ms=12, label=name)

# ax.grid(True)
# ax.legend(loc="best")
# fig.tight_layout()

# display(fig)

In [17]:
ts_dfs = [OW1["DHP"], OW1["WHP"], OW1["LR"]]
ts_labels = ["DHP", "WHP", "LR"]

ge2016norm = [df.where(df.datetime >= '2016-01-11') for df in ts_dfs]

for i, df in enumerate(ge2016norm):
  mean, std = df.select(F.mean("value"), F.stddev("value")).first()
  ge2016norm[i] = df.withColumn("value_norm", (col("value")) / std)
  ge2016norm[i] = ge2016norm[i].select(ge2016norm[i]["datetime"], ge2016norm[i]["value_norm"].alias("value"))


fig, ax = plt.subplots(1, 1, figsize=(24, 10))

for ts_df, lab in zip(ge2016norm, ts_labels):
  ts_pd = ts_df.orderBy(x_head).toPandas()

  y = ts_pd[y_head].tolist()
  x = ts_pd[x_head].tolist()
  
  ax.plot(x, y, label=lab)

ax.set_title(title, fontsize=16)
ax.set_xlabel(x_head, fontsize=16)
ax.set_ylabel(y_head, fontsize=16)

ax.xaxis.set_major_formatter(mdates.DateFormatter("%Y-%m-%d %H:%M"))
ax.xaxis.set_minor_formatter(mdates.DateFormatter("%Y-%m-%d"))

ax.legend(loc="best")

fig.tight_layout()
display(fig)