# Read n Clean MCap with pyspark

In [1]:
'''
    WARNING CONTROL to display or ignore all warnings
'''
import warnings; warnings.simplefilter('ignore')     #switch betweeb 'default' and 'ignore'
import traceback

''' Set debug flag to view extended error messages; else set it to False to turn off debugging mode '''
debug = True

In [29]:
import os
import sys
from datetime import datetime, date, timedelta

sys.path.insert(1,"/home/nuwan/workspace/rezaware/")
import rezaware as reza
from utils.modules.etl.load import sparkDBwls as sdb
from utils.modules.etl.transform import sparkCleanNRich as scne
from mining.modules.assets.etp import logReturns as log
from utils.modules.ml.timeseries import rollingstats as stats

''' restart initiate classes '''
if debug:
    import importlib
    reza = importlib.reload(reza)
    log = importlib.reload(log)
    sdb = importlib.reload(sdb)
    scne = importlib.reload(scne)
    stats= importlib.reload(stats)
    
__desc__ = "analyze crypto market capitalization time series data"
# clsSDB = sdb.SQLWorkLoads(desc=__desc__)
clsSCNR=scne.Transformer(desc=__desc__)
clsROR =log.RatioOfReturns(desc=__desc__)
clsStat=stats.RollingStats(desc=__desc__)
''' optional - if not specified class will use the default values '''
# prop_kwargs = {"WRITE_TO_TMP":True,   # necessary to emulate the etl dag
#               }
print("\nClass initialization and load complete!")

All functional APP-libraries in REZAWARE-package of REZAWARE-module imported successfully!
All functional LOGRETURNS-libraries in ETP-package of ASSETS-module imported successfully!
All functional SPARKDBWLS-libraries in LOAD-package of ETL-module imported successfully!
All functional SPARKCLEANNRICH-libraries in TRANSFORM-package of ETL-module imported successfully!
All packages in utils ml timeseries RollingStats imported successfully!
logReturns Class initialization complete

Class initialization and load complete!


## Read data from mcap_past
We apply a query to select assets with mcap > 1.0 million. Any missing values are imputed with the mean value.

In [4]:
_from_date = '2022-01-01'
_to_date = '2022-01-02'
# _query = "select * from warehouse.mcap_past "+\
#         f"where mcap_date >= '{_from_date}' and "+\
#         f"mcap_date <= '{_to_date}'"
_query = "select * from warehouse.mcap_past "+\
        f"where mcap_date between '{_from_date}' and '{_to_date}' "+\
        f"and mcap_value > 1000000"
_kwargs = {
    "TABLENAME":'warehouse.mcap_past',
    "COLUMN":'mcap_date',
    "FROMDATETIME":_from_date,
    "TODATETIME":_to_date,
    "PARTITIONS":2,
    "AGGREGATE":'avg',
    "PIVCOLUMNS":['cofix','paypolitan-token','raven-protocol',
               'nft-index','beldex','mt-pelerin-shares']
}

# print(clsSpark.dbSchema)
mcap_sdf = clsROR.read_n_clean_mcap(query=_query,**_kwargs)
# mcap_sdf = clsROR.read_n_clean_mcap(**_kwargs)

print("Loaded %d rows and %d columns" % (mcap_sdf.count(),len(mcap_sdf.columns)))

Wait a moment, retrieving data ...
23/01/20 07:26:55 WARN Utils: Your hostname, FarmRaiderTester resolves to a loopback address: 127.0.1.1; using 192.168.124.15 instead (on interface enp2s0)
23/01/20 07:26:55 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address
23/01/20 07:26:56 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
23/01/20 07:26:56 WARN FileSystem: Cannot load filesystem: java.util.ServiceConfigurationError: org.apache.hadoop.fs.FileSystem: com.google.cloud.hadoop.fs.gcs.GoogleHadoopFileSystem Unable to get public no-arg constructor
23/01/20 07:26:56 WARN FileSystem: java.lang.NoClassDefFoundError: com/google/api/client/auth/oauth2/Credential
23/01/20 07:26:56 WARN FileSystem: java.lang.ClassNotFoundException: com.google.api.client.auth.oauth2.Credential


Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
                                                                                

23/01/20 07:30:01 WARN package: Truncated the string representation of a plan since it was too large. This behavior can be adjusted by setting 'spark.sql.debug.maxToStringFields'.


[Stage 17:>                                                         (0 + 1) / 1]

23/01/20 07:30:21 WARN DAGScheduler: Broadcasting large task binary with size 3.3 MiB


[Stage 20:>                                                         (0 + 1) / 1]

23/01/20 07:30:45 WARN DAGScheduler: Broadcasting large task binary with size 6.2 MiB


[Stage 31:>                                                         (0 + 1) / 1]

23/01/20 07:35:16 WARN DAGScheduler: Broadcasting large task binary with size 3.2 MiB


[Stage 41:>                                                         (0 + 1) / 1]

23/01/20 07:37:02 WARN DAGScheduler: Broadcasting large task binary with size 3.2 MiB


[Stage 51:>                                                         (0 + 1) / 1]

23/01/20 07:39:15 WARN DAGScheduler: Broadcasting large task binary with size 3.2 MiB


[Stage 54:>                                                         (0 + 1) / 1]

Loaded 3320 rows and 3 columns


                                                                                

## Compute LogROR for all assets

In [7]:
kwargs={
    "PREVALCOLNAME":'mcap_prev_val',
    "DIFFCOLNAME":'mcap_diff',
    "LOGCOLNAME":'log_ror'
}
_mcap_log_ror, _log_col = clsROR.get_log_ror(
    data=mcap_sdf,
    num_col_name="mcap_value",
    part_column ='asset_name',
    **kwargs,
)

_mcap_log_ror.filter(_mcap_log_ror.log_ror.isNotNull()).show()

[Stage 81:>                                                         (0 + 1) / 1]

23/01/20 15:20:33 WARN DAGScheduler: Broadcasting large task binary with size 3.2 MiB


                                                                                

23/01/20 15:20:44 WARN DAGScheduler: Broadcasting large task binary with size 3.2 MiB


[Stage 87:>                                                         (0 + 1) / 1]

23/01/20 15:20:50 WARN DAGScheduler: Broadcasting large task binary with size 4.4 MiB


[Stage 96:>                                                         (0 + 0) / 1]

23/01/20 15:20:55 WARN DAGScheduler: Broadcasting large task binary with size 4.4 MiB


                                                                                

+----------+--------------------+--------------------+--------------------+--------------------+--------------------+
| mcap_date|          asset_name|          mcap_value| mcap_value_prev_val|           mcap_diff|             log_ror|
+----------+--------------------+--------------------+--------------------+--------------------+--------------------+
|2022-01-02|            switcheo|25959201.75722492...|11336695.15087185...|14622506.60635306...|-0.35980487434341657|
|2022-01-02|           primecoin|4869520.968668153...|2382648.070384798...|2486872.898283354...| -0.3104265594101802|
|2022-01-02|           cypherium|11824476.85075602...|6272519.311846281...|5551957.538909744...| -0.2753399906925486|
|2022-01-02|           micropets|38036463.67628028...|23995041.05886446...|14041422.61741581...|-0.20007871162365293|
|2022-01-02|               eqifi|35476164.52149334...|23447442.77929092...|12028721.74220241...|-0.17984124933339615|
|2022-01-02|        civilization|36663264.25913747...|24

## Weighted Portfolio

In [32]:
_wr_data.count()

[Stage 375:>                                                        (0 + 1) / 1]

23/01/20 21:14:44 WARN DAGScheduler: Broadcasting large task binary with size 3.2 MiB


                                                                                

23/01/20 21:14:48 WARN DAGScheduler: Broadcasting large task binary with size 3.2 MiB


[Stage 381:>                                                        (0 + 1) / 1]

23/01/20 21:14:53 WARN DAGScheduler: Broadcasting large task binary with size 4.4 MiB


[Stage 385:>                                                        (0 + 1) / 1]

23/01/20 21:14:55 WARN DAGScheduler: Broadcasting large task binary with size 4.4 MiB


                                                                                

1660

In [30]:
_wr_dates, _wr_data=clsROR.update_weighted_portfolio(
    data=_mcap_log_ror,
    date_col='mcap_date',
    val_col='log_ror',
    topN=23,
    size=100,
    **kwargs,
)

[Stage 280:>                                                        (0 + 1) / 1]

23/01/20 20:52:01 WARN DAGScheduler: Broadcasting large task binary with size 3.2 MiB


                                                                                

23/01/20 20:52:05 WARN DAGScheduler: Broadcasting large task binary with size 3.2 MiB


[Stage 286:>                                                        (0 + 1) / 1]

23/01/20 20:52:10 WARN DAGScheduler: Broadcasting large task binary with size 4.4 MiB


[Stage 298:>                                                        (0 + 1) / 1]

23/01/20 20:53:26 WARN DAGScheduler: Broadcasting large task binary with size 3.2 MiB


                                                                                

23/01/20 20:53:30 WARN DAGScheduler: Broadcasting large task binary with size 3.2 MiB


[Stage 304:>                                                        (0 + 1) / 1]

23/01/20 20:53:36 WARN DAGScheduler: Broadcasting large task binary with size 4.4 MiB


[Stage 308:>                                                        (0 + 1) / 1]

23/01/20 20:53:37 WARN DAGScheduler: Broadcasting large task binary with size 4.4 MiB


[Stage 322:>                                                        (0 + 1) / 1]

23/01/20 20:54:52 WARN DAGScheduler: Broadcasting large task binary with size 3.2 MiB


                                                                                

23/01/20 20:54:55 WARN DAGScheduler: Broadcasting large task binary with size 3.2 MiB


[Stage 328:>                                                        (0 + 1) / 1]

23/01/20 20:55:00 WARN DAGScheduler: Broadcasting large task binary with size 4.4 MiB


[Stage 332:>                                                        (0 + 1) / 1]

23/01/20 20:55:02 WARN DAGScheduler: Broadcasting large task binary with size 4.4 MiB


                                                                                

23/01/20 20:55:03 WARN DAGScheduler: Broadcasting large task binary with size 4.4 MiB
23/01/20 20:55:05 WARN DAGScheduler: Broadcasting large task binary with size 4.4 MiB


[Stage 351:>                                                        (0 + 1) / 1]

23/01/20 20:56:19 WARN DAGScheduler: Broadcasting large task binary with size 3.2 MiB


                                                                                

23/01/20 20:56:23 WARN DAGScheduler: Broadcasting large task binary with size 3.2 MiB


[Stage 357:>                                                        (0 + 1) / 1]

23/01/20 20:56:27 WARN DAGScheduler: Broadcasting large task binary with size 4.4 MiB


[Stage 361:>                                                        (0 + 1) / 1]

23/01/20 20:56:29 WARN DAGScheduler: Broadcasting large task binary with size 4.4 MiB


                                                                                --- Logging error ---
Traceback (most recent call last):
  File "/usr/lib/python3.8/logging/__init__.py", line 1085, in emit
    msg = self.format(record)
  File "/usr/lib/python3.8/logging/__init__.py", line 929, in format
    return fmt.format(record)
  File "/usr/lib/python3.8/logging/__init__.py", line 668, in format
    record.message = record.getMessage()
  File "/usr/lib/python3.8/logging/__init__.py", line 373, in getMessage
    msg = msg % self.args
TypeError: %d format: a number is required, not str
Call stack:
  File "/usr/lib/python3.8/runpy.py", line 194, in _run_module_as_main
    return _run_code(code, main_globals, None,
  File "/usr/lib/python3.8/runpy.py", line 87, in _run_code
    exec(code, run_globals)
  File "/home/nuwan/.local/lib/python3.8/site-packages/ipykernel_launcher.py", line 17, in <module>
    app.launch_new_instance()
  File "/home/nuwan/.local/lib/python3.8/site-packages/

## SMA

In [62]:
kwargs={
    "DATETIMEATTR":'mcap_date',
    "WINLENGTH":7,
    "WINUNIT":'DAY',
}
_sma_sdf=clsStat.simple_moving_stats(
    column='diff',   # column name to apply the rolling computation
    stat_op="mean", # stat operation sum, mean or standard deviation
    data=_mcap_log_ror,   # data set
    **kwargs,    # 
)


In [63]:
_sma_sdf.show()

23/01/18 16:08:49 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
23/01/18 16:08:49 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
23/01/18 16:08:49 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.


[Stage 289:>                                                        (0 + 1) / 1]

23/01/18 16:09:19 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
23/01/18 16:09:19 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.


[Stage 291:>                                                        (0 + 1) / 1]

23/01/18 16:09:21 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
23/01/18 16:09:21 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
23/01/18 16:09:22 WARN DAGScheduler: Broadcasting large task binary with size 3.3 MiB


[Stage 294:>                                                        (0 + 1) / 1]

23/01/18 16:09:27 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
23/01/18 16:09:27 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
23/01/18 16:09:29 WARN DAGScheduler: Broadcasting large task binary with size 4.7 MiB


[Stage 298:>                                                        (0 + 1) / 1]

23/01/18 16:09:30 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
23/01/18 16:09:30 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
23/01/18 16:09:32 WARN DAGScheduler: Broadcasting large task binary with size 4.7 MiB


[Stage 303:>                                                        (0 + 1) / 1]

+-------------------+------------------+--------------------+------------------+--------------------+--------------------+
|          mcap_date|        asset_name|          mcap_value|         mcap_diff|                diff|   rolling_mean_diff|
+-------------------+------------------+--------------------+------------------+--------------------+--------------------+
|2022-01-01 00:00:00|            0chain|18581739.78078547...| 7.290301186807742|-930233.240037870...|66178035.76083997...|
|2022-01-01 00:00:00|              1_up|6580366.707037671...|   6.8616358287375|-691331.205561129...|66178035.76083997...|
|2022-01-01 00:00:00|              1art|20450189.55773175...| 7.363364636916094|-2636658.08537944...|66178035.76083997...|
|2022-01-01 00:00:00|             1inch|1025227565.328557...| 8.979442079049766|71461032.82505580...|66178035.76083997...|
|2022-01-01 00:00:00|            1world|2254182.045747336...| 6.312876221139709|198877.3184413719...|66178035.76083997...|
|2022-01-01 00:0

                                                                                