In [1]:
%load_ext lab_black
%load_ext autoreload
%autoreload 2

import numpy as np
import pandas as pd
import geopandas
from datetime import date, timedelta
from common import (
    query_mssql_iteratively,
    get_elastic_index,
    SCFH_TO_SLPM_FACTOR,
    P_CUBED_CRS,
    create_emission_source_query_report_id,  ## added to get the Report Id
)
from config import (
    CUSTOMER,
    GLOBAL_LISA_TO_LEAK_CONVERSION_RATE,
    SUPER_EMITTER_SLPM,
    ITALY_PROJECTION_COORD_REF_SYSTEM,
)

In [2]:
CUSTOMER

'italgas'

## Get Emission Sources

# Load old emissions from previous month

In [None]:
current_month_emissions = pd.read_pickle(
    f"data/non-reprocessed-emission-sources-{CUSTOMER}-June.pickle"
)
current_month_emissions.info()
current_month_emissions.head()

In [None]:
current_month_emissions.nunique()

### If Re-Processed Emission Sources

In [None]:
# emission_sources_recalculated = pd.read_pickle(
#     f"data/non-reprocessed-emission-sources-recalculated{CUSTOMER}-errors.pickle"
# )
# print(emission_sources_recalculated["ReportId"].nunique())
# emission_sources_recalculated.info()
# emission_sources_recalculated.head(8)
emission_sources = pd.read_pickle(
    f"data/non-reprocessed-emission-sources-recalculated{CUSTOMER}-errors.pickle"
)
print(emission_sources["ReportId"].nunique())
emission_sources.info()
emission_sources.head(8)

In [None]:
emission_sources_recalculated[
    emission_sources_recalculated[["ReportId", "PeakNumber"]].duplicated()
]
print(emission_sources_recalculated["ReportId"].nunique())

# Concatenate the two dataframes

In [None]:
frames = [emission_sources_recalculated, current_month_emissions]
emission_sources = pd.concat(frames)

emission_sources.info()
emission_sources.head()

In [None]:
emission_sources.nunique()

## Convert to Datetime

In [None]:
emission_sources = emission_sources.rename(
    columns={
        "PlumeEpochEnd": "SurveyEndDateTime",
        "PlumeEpochStart": "DateReportStarted",
    }
)

emission_sources["DateReportStarted"] = pd.to_datetime(
    emission_sources["DateReportStarted"]
)
emission_sources["SurveyEndDateTime"] = pd.to_datetime(
    emission_sources["SurveyEndDateTime"]
)

In [None]:
emission_sources.head()

## Get Leak Investigations

In [3]:
if CUSTOMER == "italgas":
    # elasticsearch_index = "italgas_leak_investigation"
    elasticsearch_index = "italgas_g2g_leak_investigation"
elif CUSTOMER == "toscana energia":
    elasticsearch_index = "toscana_leak_investigation"
else:
    raise ValueError("CUSTOMER should be 'italgas' or 'toscana energia'")


leak_investigations_raw = get_elastic_index(elasticsearch_index)



In [4]:
leak_investigations_raw.info()

<class 'pandas.core.frame.DataFrame'>
RangeIndex: 68221 entries, 0 to 68220
Data columns (total 67 columns):
 #   Column                              Non-Null Count  Dtype 
---  ------                              --------------  ----- 
 0   leakId                              68221 non-null  object
 1   numProgressivo                      68221 non-null  object
 2   lisa                                68221 non-null  object
 3   aereoInterrato                      68221 non-null  object
 4   codiceDispersione                   68221 non-null  object
 5   codStato                            68221 non-null  object
 6   statoFoglietta                      68221 non-null  object
 7   codValidazione                      68221 non-null  object
 8   statoValidazione                    68221 non-null  object
 9   intervento                          68221 non-null  object
 10  dataInserimento                     68221 non-null  object
 11  dataArrivoSulCampo                  68221 non-null  ob

In [None]:
report_ids = emission_sources.ReportId.unique().tolist()

leak_columns_to_keep = [
    "PeakNumber",
    "Region",
    "City",
    "PCubedReportName",
    "BoxId",
    # "LeakGrade",
    "dataLocalizzazione",
    "codiceDispersione",
    # "FoundDateTime",
    "dataLocalizzazione",
    # "AG/BG",
    "aereoInterrato",
    # "LeakLocation",
    "indirizzoLocalizzazione",
    "LeakCoordLatLon",
    "PCubedReportGuid",
    "LeakFound",
    "PipelineMeters",
    "AssetCoverageFrac",
    "PriorityScore",  ## added for the emission sources solution
]

leak_investigations = (
    leak_investigations_raw.query("PCubedReportGuid.isin(@report_ids)")[
        leak_columns_to_keep
    ]
    .reset_index(drop=True)
    .copy()
)


leak_investigations["PeakNumber"] = (
    leak_investigations["PeakNumber"]
    .str.rsplit("LISA ")
    .map(lambda x: x[-1])
    .astype(int)
)

# leak_investigations["FoundDateTime"] = pd.to_datetime(
#     leak_investigations["FoundDateTime"]
# )
leak_investigations["dataLocalizzazione"] = pd.to_datetime(
    leak_investigations["dataLocalizzazione"], unit="s", infer_datetime_format=True
)


leak_investigations.info()
leak_investigations.head()

### Following the discussion with Noah:
 (https://picarro.slack.com/archives/D034LGQ0FV2/p1649962711721239?thread_ts=1649958068.663449&cid=D034LGQ0FV2)
 - I am renaming the PrioritiyScore and PeakNumber of the "leak_investigations" to don't eliminate them but at least to differentiate them. I add a "leak" at the end of PriorityScore and PeakNumber
 - Don't consider the cells below that are commented. They will be eliminated once everything works.

In [None]:
leak_investigations.rename(
    columns={"PriorityScore": "PriorityScore_leak", "PeakNumber": "PeakNumber_leak"},
    inplace=True,
)
leak_investigations.info()

In [None]:
def get_latitude(x):
    if type(x) != list:
        lat = np.nan
    else:
        lat = x[0]
    return lat


def get_longitude(x):
    if type(x) != list:
        lon = np.nan
    else:
        lon = x[1]
    return lon


leak_investigations["LeakLatitude"] = (
    leak_investigations["LeakCoordLatLon"]
    .str.split(",")
    .map(get_latitude)
    .astype(float)
)
leak_investigations["LeakLongitude"] = (
    leak_investigations["LeakCoordLatLon"]
    .str.split(",")
    .map(get_longitude)
    .astype(float)
)

### Merge the emission sources with the leak investigations

In [None]:
leak_and_emission_source_join = (
    emission_sources.set_index("PeakNumber")
    .drop(index=0)
    .reset_index()
    .merge(
        leak_investigations,
        how="left",
        left_on=["ReportId", "PeakNumber"],
        right_on=["PCubedReportGuid", "PeakNumber_leak"],
        validate="1:m",
    )
)

non_reported_emission_sources = (
    emission_sources.set_index("PeakNumber").loc[0].reset_index()
)
leak_and_emission_source_join = pd.concat(
    [leak_and_emission_source_join, non_reported_emission_sources], axis=0
)

leak_and_emission_source_join.info()

## Get Leak Probability

In [None]:
lisa_to_leak_probability_map = {
    "Found_Gas_Leak": 1,
    "Found_Other_Source": 0,
    "No_Gas_Found": 0,
    "Not_Investigated": GLOBAL_LISA_TO_LEAK_CONVERSION_RATE,
    "In_Progress": GLOBAL_LISA_TO_LEAK_CONVERSION_RATE,
}

leak_and_emission_source_join["LeakProbability"] = (
    leak_and_emission_source_join["LeakFound"]
    .map(lisa_to_leak_probability_map)
    .fillna(GLOBAL_LISA_TO_LEAK_CONVERSION_RATE)
)

In [None]:
leak_and_emission_source_join["LeakProbability"].value_counts(dropna=False)

## Get Natural Gas or Possible Natural Gas LISAs

In [None]:
leak_and_emission_source_join = (
    leak_and_emission_source_join.set_index("DispositionName")
    .drop(index=["Not_Natural_Gas"])
    .reset_index()
)
leak_and_emission_source_join.info()
leak_and_emission_source_join.head()

## Save

In [None]:
leak_and_emission_source_join.info()
leak_and_emission_source_join.head()

leak_and_emission_source_join.to_pickle(
    f"data/prepared-leaks-with-emission-sources-{CUSTOMER}-errors.pickle"
)