# M.0 Data initialization

In [1]:
import datetime

import time

import pyspark
import pyspark.sql.functions as f
from pyspark import SparkContext
from pyspark.sql import *
from pyspark.sql.types import *

import pandas as pd
import numpy as np

import warnings
warnings.filterwarnings('ignore')

In [2]:
NUMBER_OF_THREADS_TO_USE = "*"
spark = SparkSession.builder \
    .master('local[' + NUMBER_OF_THREADS_TO_USE + ']') \
    .appName('discret_pierre_signature_generation') \
    .config('spark.driver.memory', '200g') \
    .config('spark.driver.maxResultSize', '15g') \
    .config('spark.rapids.sql.enabled','true') \
    .getOrCreate()
sc = spark.sparkContext

spark

In [3]:
# those are the metadata used to sum the number of requests
meta = ['WeeksGroup', 'LocationId', 'MinuteWithinWeek']

# those are the metrics once we have gathered antennas and stuff
metrics = ['Voice','SMS_3G','PS','CS','Call','SMS_4G','Service_Req','HO']
#metrics = ['Call','SMS','Data','Mobility','Signalling','Emergency','Overload']

SourceParquetFilesLoc = '/WORKSPACE/Pierre/Cancan2022/Cancan2022_Paris/'
MedianParquetFilesLoc = '/WORKSPACE/Pierre/Cancan2022/Cancan2022_Paris_meds/'

ParquetFilesSignaturesLoc = '/WORKSPACE/Pierre/Cancan2022/Cancan2022_Paris_sigs/'
ParquetFilesDistribsLoc = '/WORKSPACE/Pierre/Cancan2022/Cancan2022_Paris_distribs/'

ParquetFilesALRsLoc = '/WORKSPACE/Pierre/Cancan2022/Cancan2022_Paris_ALR/'
CsvFilesThresholdsLoc = '/WORKSPACE/Pierre/Cancan2022/Cancan2022_Paris_Thresholds.csv'


In [4]:
print(ParquetFilesSignaturesLoc)
print(ParquetFilesDistribsLoc)
print(ParquetFilesALRsLoc)
print(CsvFilesThresholdsLoc)

/WORKSPACE/Pierre/Cancan2022/Cancan2022_Paris_sigs/
/WORKSPACE/Pierre/Cancan2022/Cancan2022_Paris_distribs/
/WORKSPACE/Pierre/Cancan2022/Cancan2022_Paris_ALR/
/WORKSPACE/Pierre/Cancan2022/Cancan2022_Paris_Thresholds.csv


## 0.1 Load and format data

In [5]:
MediansDFSP = spark.read.parquet(MedianParquetFilesLoc)

MediansDFSP.printSchema()

OriginalDataDFSP = spark.read.parquet(SourceParquetFilesLoc)

OriginalDataDFSP.printSchema()

#Medians = MediansDFSP.toPandas()

#MediansDFSP.show(10)

#print(Medians.info(verbose=True))

#lMinutes = []
#for i in range(0,7*24*60):
#    lMinutes.append([i])
#dfMinutes = sc.parallelize(lMinutes).toDF(["MinuteWithinWeek"])
#dfMinutes = dfMinutes.withColumn("MinuteWithinWeek", dfMinutes.MinuteWithinWeek.cast('integer'))


#MediansDF = pd.read_parquet(ParquetFilesLoc)



root
 |-- MinuteWithinWeek: integer (nullable = true)
 |-- Voice: array (nullable = true)
 |    |-- element: long (containsNull = true)
 |-- SMS_3G: array (nullable = true)
 |    |-- element: long (containsNull = true)
 |-- PS: array (nullable = true)
 |    |-- element: long (containsNull = true)
 |-- CS: array (nullable = true)
 |    |-- element: long (containsNull = true)
 |-- Call: array (nullable = true)
 |    |-- element: long (containsNull = true)
 |-- SMS_4G: array (nullable = true)
 |    |-- element: long (containsNull = true)
 |-- Service_Req: array (nullable = true)
 |    |-- element: long (containsNull = true)
 |-- HO: array (nullable = true)
 |    |-- element: long (containsNull = true)
 |-- WeeksGroup: integer (nullable = true)
 |-- LocationId: integer (nullable = true)

root
 |-- time_utc: string (nullable = true)
 |-- time_local: timestamp (nullable = true)
 |-- Voice: long (nullable = true)
 |-- PS: long (nullable = true)
 |-- SMS_3G: long (nullable = true)
 |-- CS: lon

In [6]:
LocIdsList = sorted([x.LocationId for x in MediansDFSP.select('LocationId').distinct().collect()])

print("found " + str(len(LocIdsList)) + " location groups")

#print(LocIdsList)

AllWeeksIdsList = sorted([x.WeekOfYear for x in OriginalDataDFSP.select('WeekOfYear').distinct().collect()])

print("All Weeks Ids:")
print(AllWeeksIdsList)


AllWeeksGroupsList = sorted([x.WeeksGroup for x in OriginalDataDFSP.select('WeeksGroup').distinct().collect()])

print("All Weeks Ids:")
print(AllWeeksGroupsList)



found 1026 location groups
All Weeks Ids:
[11, 12, 13, 14, 15, 16, 17, 18, 19, 20, 21, 22, 23, 24]
All Weeks Ids:
[0, 1, 2]


In [7]:


# select only the medians
for m in metrics:
    MediansDFSP = MediansDFSP.withColumn(m, MediansDFSP[m].getItem(1))
#MediansDFSP = MediansDFSP.select()



In [8]:
from tqdm import tqdm

from anr_discret import offlineMLbyPL

filteredDFs = []


#start_time = time.time()
with tqdm(total=len(LocIdsList)*len(AllWeeksGroupsList), desc='Signature extraction') as pbar:
    for WGrp in AllWeeksGroupsList:
    
        # run 2 imbricated loops, this hasn't much consequence on the output thanks to the filter / partitioning thing 
        for LocId in LocIdsList:
            #if LocId>10:
            #    break
            # some minutes may be missing. Since they are already sorted, we use MinuteWithinWeek as an index 
            # converting the filtered data to pandas dataframe seems to be actually quite fast ; using the index is straitforward too
            df = MediansDFSP.filter((MediansDFSP.LocationId == LocId) & (MediansDFSP.WeeksGroup == WGrp)).toPandas().set_index('MinuteWithinWeek')

            # fill the missing minutes ; don't forget to set the proper value to columns weeksgroup and locationid
            df = df.reindex(range(0,24*7*60), fill_value=0).assign(LocationId=LocId, WeeksGroup=WGrp)

            # now the DataFrame is ready to compute the butterworth filter
            filt = offlineMLbyPL.signature_filtered(df, cutoff=8, metrics=metrics).reset_index()
            # reset index because we want the "minutewithinweek" column back as a standard column

            # use the append function not to slow down computation because of the concatenation thing
            filteredDFs.append(filt)

            pbar.update(1)

            
# just concat everything at the same time
FilteredSignatureDF = pd.concat(filteredDFs)

# save as parquet because it's much more efficient - keep the same partition structure although it's not really good
# don't forget to remove the "index thing" (although we know that the actual index is ['WeeksGroup','LocationId', 'MinuteWithinWeek'])
#FilteredSignatureDF.to_parquet(path=ParquetFilesSignaturesLoc, partition_cols=['WeeksGroup','LocationId'], index=False)



Signature extraction: 100%|██████████| 3078/3078 [07:09<00:00,  7.16it/s]


In [9]:
FilteredSignatureDF.to_parquet(path=ParquetFilesSignaturesLoc, partition_cols=['WeeksGroup','LocationId'], index=False)

In [17]:
import scipy.stats as stats

SigmaSep = 2.32          # how do we select the set of absolute error values to 


def fit_distribution_eric_local(df:pd.DataFrame, metrics:list) -> dict:
    """Fit a Gamma distribution to the given columns the input dataframe.

    Parameters
    ----------
    df: pandas.core.frame.DataFrame
        The input dataframe (usually containing AE values)
    metrics: list
        The list of column names to fit the Gamma distribution

    Returns
    -------
    distrib: dict
        The dictionary containing, for each column name, the Gamma parameters
    """

    distrib = {}

    #compter nombre de cas où on est à np.inf pour vérifier que ça n'arrive pas
    df = df.replace([np.inf, -np.inf], np.nan)

    
    for m in metrics:
        # pour éviter sort : hypothèse distribution gaussienne + estimer à partir de mean et std
        # alpha = 2.32 std pour exclure 99% des donnée
        # alpha = 1.5 std pour exclure 95% données
        x_mean = df[m].mean()
        x_std = df[m].std()
        SeparationThresh = x_mean + SigmaSep * x_std
        x = df[m].loc[df[m] > SeparationThresh]
        
        # tout écart inférieur à seuil aura comme probabilité 1-proba
        proba = len(x.index) / len(df.index)
        
        # donc on stocke seuil et on compare dans la phase de détection à seuil
        # si value < seuil, on retourne 1-proba
        # sinon, on regarde la sf de la loi et on multiplie par proba
        # regarder la manière dont cette loi est respectée
        firsttuple = (SeparationThresh, proba, len(x.index))

        if len(x.index)>=3:
            # regarder options pour supprimer outliers dans la fonction gamma fit
            fittuple = stats.gamma.fit(x)
            # before the code used to translate this tuple used to be the following:

            distrib[m] = firsttuple + (fittuple[0], fittuple[-2], fittuple[-1])
        else:
            distrib[m] = firsttuple + (np.nan, np.nan, np.nan)
    return distrib








totalALR = []




MinALSThreshold = 1 / (60*24*365.25*10)




def compute_alr_eric_local(df:pd.DataFrame, distrib:dict, metrics:list) -> pd.DataFrame:
    """Computes the Anomaly Likelihood Rate (ALR) over the input dataframe.
    The ALR corresponds to the sum of the logs of the p-value for each service data.
    The p-value is obtained for each service data by fitting a Gamma distribution over the dataset.

    Parameters
    ----------
    df: pandas.core.frame.DataFrame
        The input dataframe (usually containing AE values)
    distrib: dict
        The error distribution parameters for all services
    metrics: list
        The list of column names corresponding to the service data

    Returns
    -------
    df: pandas.core.frame.DataFrame
        The same dataframe with the additional ALR column
    """

    res = pd.DataFrame().reindex_like(df)
    als = pd.DataFrame(index = res.index, columns = metrics)
    
    CopyCols = list(set(df.columns) - set(metrics))
    res[CopyCols] = df[CopyCols].copy(deep=True)

    for m in metrics:
        mLoc = df.columns.get_loc(m)
            
        SeparationThresh = distrib.loc['thresh',m]
        proba = distrib.loc['proba',m]
        nvalues = distrib.loc['nvalues',m]
        arg = distrib.loc['k',m]
        loc = distrib.loc['loc',m]
        scale = distrib.loc['theta',m]

        if nvalues<3:
            #print("case where the gamma law was not fitted on metric " + m)
            res.loc[:,m] = pd.Series(np.nan, index=df.index)
        else:
            # default value is 1-proba
            res.loc[:,m] = pd.Series((1. - proba), index=df.index)
            # the gamma law was fitted on this metric
            indexThresh = df.index[df[m]>SeparationThresh]

            res.loc[indexThresh,m] = (1. - proba) * stats.gamma.sf(df.loc[indexThresh, m], arg, loc=loc, scale=scale)
            res.loc[indexThresh,m].clip(lower=MinALSThreshold, inplace=True)

        als[m] = pd.Series(np.log(res[m]), index=res.index)

    res['ALR'] = als.sum(axis=1)
    return res






signatureDFs = []

ColumnsDropList = []
for m in metrics:
    ColumnsDropList.append(m+'_ref')

count = 0

distributionsDFList = []



#SmallerLocIdsList = LocIdsList[0:50]

    
#with tqdm(total=len(LocIdsList)*len(AllWeeksGroupsList), desc='Signature extraction') as pbar:
with tqdm(total=len(LocIdsList)*1, desc='Signature extraction') as pbar:
        
    # run 2 imbricated loops, this hasn't much consequence on the output thanks to the filter / partitioning thing 
    #for WGrp in AllWeeksGroupsList:
    for WGrp in range(2,3):
        
        GroupDFSP = OriginalDataDFSP.drop('time_local').filter(OriginalDataDFSP.WeeksGroup==WGrp)
        TestWeeksIdsList = sorted([x.WeekOfYear for x in GroupDFSP.select('WeekOfYear').distinct().collect()])
        TrainWeeksIdsList = list(set(AllWeeksIdsList) - set(TestWeeksIdsList))
                                                               
        
        for LocId in LocIdsList:
            #if LocId>20:
            #    break
            # gather all the data corresponding to the training group and the corresponding location
            # start with the location
            OriginalDataLocSP = OriginalDataDFSP.filter(OriginalDataDFSP.LocationId == LocId)


            #start_time = time.time()
            referenceDF = FilteredSignatureDF.loc[(FilteredSignatureDF['LocationId']==LocId) & (FilteredSignatureDF['WeeksGroup']==WGrp)].drop(columns=['LocationId','WeeksGroup'])
            #print("grabbing the ref time performed in " + str((time.time() - start_time)) + " seconds")

            # then the corresponding weeks. I believe that it's more convenient by unioning stuff
            #start_time = time.time()

            #OriginalDataDFList = []
            ErrorsDFsList = []
            for wk in TrainWeeksIdsList:
                wkDF = OriginalDataLocSP.filter(OriginalDataLocSP.WeekOfYear==wk).drop('time_utc', 'time_local').toPandas()
                wkDF = wkDF.set_index('MinuteWithinWeek')
                wkDF = wkDF.reindex(range(0,24*7*60), fill_value=0).assign(WeekOfYear=wk, LocationId=LocId, WeeksGroup=WGrp).fillna(0)

                for m in metrics:
                    wkDF[m] = wkDF[m] - referenceDF[m]
                    #wkDF[m] = abs(wkDF[m] - referenceDF[m])

                wkDF.reset_index(inplace=True)

                #print("length wkDF :" + str(len(wkDF.index)))

                ErrorsDFsList.append( wkDF )

            errors = pd.concat(ErrorsDFsList)


            #distributions = offlineMLbyPL.get_distrib_params(errors, meta=meta, metrics=metrics)
            distribution = pd.DataFrame(np.nan, index=['thresh', 'proba', 'nvalues', 'k','loc','theta'], columns=['WeeksGroup', 'LocationId', *metrics])
            distrib = fit_distribution_eric_local(errors, metrics)
            for m in metrics:
                for k in range(0,6):
                    mLoc = distribution.columns.get_loc(m)
                    distribution.iloc[k,mLoc] = distrib[m][k]

            distribution = distribution.assign(LocationId=LocId, WeeksGroup=WGrp)
            distribution.index.names = ['GammaParam']

            #alrDF = compute_alr_eric_local(AbsErrors, GammaLawDF, metrics)
            alrDF = compute_alr_eric_local(errors, distribution, metrics)

            distribution.reset_index(inplace=True)
            distributionsDFList.append(distribution)


            totalALR.append(alrDF)

            #break


            pbar.update(1)
        
            
distributionsDF = pd.concat(distributionsDFList)
totalALRDF = pd.concat(totalALR)



#print(distributionsDF.info())
#print(distributionsDF.describe())
#print(distributionsDF)


#distributionsDF.to_parquet(path=ParquetFilesDistribsLoc, partition_cols=['WeeksGroup','LocationId'], index=False)



Signature extraction: 100%|██████████| 1026/1026 [27:59<00:00,  1.64s/it]


In [18]:
#totalALRDF['time_utc'] = pd.to_datetime(totalALRDF['time_utc'])

#ParquetFilesDistribsLoc = '/WORKSPACE/Pierre/tests/' + DatasetPrefix + '_' + LocationPrefix + '_distribs300LocIdsGroup012/'
totalALRDF.to_parquet(path=ParquetFilesALRsLoc, partition_cols=['WeeksGroup', 'LocationId'], index=False)

In [15]:
#ParquetFilesALRsLoc = '/WORKSPACE/Pierre/tests/' + DatasetPrefix + '_' + LocationPrefix + '_ALR0fill300LocIdsGroup012/'
distributionsDF.to_parquet(path=ParquetFilesDistribsLoc, partition_cols=['WeeksGroup', 'LocationId'], index=False)



In [20]:
# compensate for the fact that we have to compute the totalALDRF thing in 3 different times to store the result...

totalALRDF = spark.read.parquet(ParquetFilesALRsLoc).toPandas()

distributionsDF = spark.read.parquet(ParquetFilesDistribsLoc).toPandas()


ERROR:py4j.java_gateway:An error occurred while trying to connect to the Java server (127.0.0.1:36579)
Traceback (most recent call last):
  File "/opt/conda/envs/rapids/lib/python3.7/site-packages/py4j/java_gateway.py", line 977, in _get_connection
    connection = self.deque.pop()
IndexError: pop from an empty deque

During handling of the above exception, another exception occurred:

Traceback (most recent call last):
  File "/opt/conda/envs/rapids/lib/python3.7/site-packages/py4j/java_gateway.py", line 1115, in start
    self.socket.connect((self.address, self.port))
ConnectionRefusedError: [Errno 111] Connection refused


Py4JNetworkError: An error occurred while trying to connect to the Java server (127.0.0.1:36579)

In [10]:
distributionsDF.info(verbose=True)

totalALRDF.info(verbose=True)

<class 'pandas.core.frame.DataFrame'>
Int64Index: 6156 entries, 0 to 5
Data columns (total 11 columns):
 #   Column       Non-Null Count  Dtype  
---  ------       --------------  -----  
 0   GammaParam   6156 non-null   object 
 1   WeeksGroup   6156 non-null   int64  
 2   LocationId   6156 non-null   int64  
 3   Voice        3352 non-null   float64
 4   SMS_3G       3352 non-null   float64
 5   PS           3352 non-null   float64
 6   CS           3352 non-null   float64
 7   Call         3860 non-null   float64
 8   SMS_4G       3860 non-null   float64
 9   Service_Req  3857 non-null   float64
 10  HO           3860 non-null   float64
dtypes: float64(8), int64(2), object(1)
memory usage: 577.1+ KB
<class 'pandas.core.frame.DataFrame'>
Int64Index: 93078720 entries, 0 to 10079
Data columns (total 13 columns):
 #   Column            Dtype  
---  ------            -----  
 0   MinuteWithinWeek  int64  
 1   Voice             float64
 2   PS                float64
 3   SMS_3G        

In [30]:



def level1byPL(alr:pd.Series) -> float:
    """Sets a level 1 ALR threshold (i.e. pre-alert threshold) for the input data series.
    The threshold corresponds to the 2-sigma quantile of the ALR distribution.

    Parameters
    ----------
    alr: pandas.core.frame.Series
        The series of ALR values
    
    Returns
    -------
    thresh: float
        The ALR threshold of the series
    """

    #res = alr.replace([np.inf, -np.inf], np.nan).dropna(how="all")
    # 1 alert every 22 minutes was the parameter used by 
    # once per 4 hours -> 1 per 4*60 minutes
    #return res.quantile(1/240)
    return alr.quantile(0.00416667)

def level2byPL(alr:pd.Series) -> float:
    # 1 alert every 370 minutes
    """Sets a level 2 ALR threshold (i.e. alert threshold) for the input data series.
    The threshold corresponds to the 3-sigma quantile of the ALR distribution.

    Parameters
    ----------
    alr: pandas.core.frame.Series
        The series of ALR values
    
    Returns
    -------
    thresh: float
        The ALR threshold of the series
    """

    #res = alr.replace([np.inf, -np.inf], np.nan).dropna(how="all")
    #return res.quantile(1-0.9973)
    # once per day -> 1 per 24*60 minutes
    #return res.quantile(1/1440)
    return alr.quantile(0.0006944)

def level3byPL(alr:pd.Series) -> float:
    """Sets a level 3 ALR threshold (i.e. maximal alert threshold) for the input data series.
    The threshold corresponds to the 4-sigma quantile of the ALR distribution.

    Parameters
    ----------
    alr: pandas.core.frame.Series
        The series of ALR values
    
    Returns
    -------
    thresh: float
        The ALR threshold of the series
    """
    #1 alert every 15873 minute
    #res = alr.replace([np.inf, -np.inf], np.nan).dropna(how="all")
    #return res.quantile(1-0.999937)
    return alr.quantile(0.0000992)




#CsvFilesThresholdsLoc = 'exports/' + DatasetPrefix + '_thresholds_0fill2groups_' + LocationPrefix + '.csv'


# apply the offlineML functions to aggregate thresholds for each antenna
thresholds = totalALRDF.groupby(['WeeksGroup','LocationId']).ALR.agg([level1byPL, level2byPL, level3byPL])
thresholds.reset_index(inplace=True)

# don't forget to store the index or the following will be annoying
#thresholds.reset_index(inplace=True)
#thresholds.to_csv(CsvFilesThresholdsLoc)


In [31]:
#CsvFilesThresholdsLoc = '/WORKSPACE/Pierre/tests/' + DatasetPrefix + '_' + LocationPrefix + '_thresholds_300LocIdsGroup012.csv'
thresholds.to_csv(CsvFilesThresholdsLoc, index=False)

In [32]:
print(thresholds.info(verbose=True))
thresholds.describe()

<class 'pandas.core.frame.DataFrame'>
RangeIndex: 39 entries, 0 to 38
Data columns (total 5 columns):
 #   Column      Non-Null Count  Dtype  
---  ------      --------------  -----  
 0   WeeksGroup  39 non-null     int64  
 1   LocationId  39 non-null     int64  
 2   level1byPL  39 non-null     float64
 3   level2byPL  39 non-null     float64
 4   level3byPL  39 non-null     float64
dtypes: float64(3), int64(2)
memory usage: 1.6 KB
None


Unnamed: 0,WeeksGroup,LocationId,level1byPL,level2byPL,level3byPL
count,39.0,39.0,39.0,39.0,39.0
mean,1.0,11.692308,-3.67779,-6.483696,-10.334911
std,0.82717,4.840367,3.370149,5.777767,9.220322
min,0.0,4.0,-14.405279,-24.114274,-34.136648
25%,0.0,8.0,-5.526967,-9.248671,-14.862737
50%,1.0,12.0,-4.144289,-7.138496,-11.536166
75%,2.0,16.0,0.0,0.0,0.0
max,2.0,19.0,0.0,0.0,0.0


In [28]:
#print(LocIdsList)
# some verifications about what we have in the 

pd.set_option('display.max_columns', 500)
pd.set_option('display.width', 1000)

distributionsDF[distributionsDF['GammaParam']=='theta'].info()

#print("threshold statistics")
#distributionsDF[distributionsDF['GammaParam']=='thresh'].drop(columns=['LocationId', 'WeeksGroup']).describe()

print("\n\nproba statistics")
print(distributionsDF[distributionsDF['GammaParam']=='proba'].drop(columns=['LocationId', 'WeeksGroup']).describe())

LocIdsInfosDF = pd.read_csv('exports/Cancan_Paris_LocInfos_AllData.csv')
#print(LocIdsInfosDF.info(verbose=True))

#SelectedLocIdsInfos = LocIdsInfosDF[LocIdsInfosDF['LocationId'].isin(SmallerLocIdsList)]
#print(SelectedLocIdsInfos.info(verbose=True))

print("\n\npossible non-zero values depending on the technology")
CountTechno = SelectedLocIdsInfos.groupby(['LocationId', 'TECHNO']).size().groupby('TECHNO').count()
print(CountTechno)


<class 'pandas.core.frame.DataFrame'>
Int64Index: 3078 entries, 5 to 5
Data columns (total 11 columns):
 #   Column       Non-Null Count  Dtype  
---  ------       --------------  -----  
 0   GammaParam   3078 non-null   object 
 1   WeeksGroup   3078 non-null   int64  
 2   LocationId   3078 non-null   int64  
 3   Voice        976 non-null    float64
 4   SMS_3G       976 non-null    float64
 5   PS           977 non-null    float64
 6   CS           976 non-null    float64
 7   Call         1342 non-null   float64
 8   SMS_4G       1343 non-null   float64
 9   Service_Req  1310 non-null   float64
 10  HO           1343 non-null   float64
dtypes: float64(8), int64(2), object(1)
memory usage: 288.6+ KB


proba statistics
             Voice       SMS_3G           PS           CS         Call       SMS_4G  Service_Req           HO
count  3078.000000  3078.000000  3078.000000  3078.000000  3078.000000  3078.000000  3078.000000  3078.000000
mean      0.013033     0.009380     0.008660   

NameError: name 'SelectedLocIdsInfos' is not defined

In [90]:
thresholds.describe()


Unnamed: 0,LocationId,WeeksGroup,level1byPL,level2byPL,level3byPL
count,300.0,300.0,300.0,300.0,300.0
mean,1061.803333,0.0,-5.195179,-11.174876,-18.086576
std,110.215135,0.0,3.778842,6.997304,9.384837
min,875.0,0.0,-26.323358,-56.610089,-65.210885
25%,961.75,0.0,-6.574128,-14.145312,-22.871932
50%,1064.0,0.0,-4.398969,-9.676692,-16.536602
75%,1156.5,0.0,-2.668047,-6.774801,-11.970358
max,1251.0,0.0,-0.000121,-0.000121,-0.000123
