---
# Daten von Git importieren
---


In [1]:
#Daten von Git Repository importieren!
!git clone https://github.com/sleuoth-hof/trading_2019.git

#!ls trading_2019/Warehouse/Exportdaten_PPO

#!ls -alh

Cloning into 'trading_2019'...
remote: Enumerating objects: 3158, done.[K
remote: Counting objects: 100% (3158/3158), done.[K
remote: Compressing objects: 100% (3081/3081), done.[K
remote: Total 7908 (delta 157), reused 840 (delta 69), pack-reused 4750[K
Receiving objects: 100% (7908/7908), 28.14 MiB | 23.00 MiB/s, done.
Resolving deltas: 100% (263/263), done.


---
# Frameworks installieren
---

In [2]:
#Java
!apt-get install openjdk-11-jdk-headless -qq > /dev/null
#Spark
!wget -q https://www-us.apache.org/dist/spark/spark-2.4.5/spark-2.4.5-bin-hadoop2.7.tgz
!tar xf spark-2.4.5-bin-hadoop2.7.tgz
#findspark
!pip install findspark
#ts.flint.timeSeries Dataframe. ähnlich eines normalen DataFrames mit speziellen Funktionen für Zeitreihen.. jeder DataFrame braucht eine Spalte mit einem Zeit-Datentypen.
!pip install ts ts-flint



Collecting findspark
  Downloading https://files.pythonhosted.org/packages/b1/c8/e6e1f6a303ae5122dc28d131b5a67c5eb87cbf8f7ac5b9f87764ea1b1e1e/findspark-1.3.0-py2.py3-none-any.whl
Installing collected packages: findspark
Successfully installed findspark-1.3.0
Collecting ts
  Downloading https://files.pythonhosted.org/packages/26/dd/76551999e37032441d1190d7df27c30fc75adbb203dacb91a2a72e8b7202/ts-0.5.1.tar.gz
Collecting ts-flint
  Downloading https://files.pythonhosted.org/packages/bb/f9/afab3946adefd527ca376aa71fdb6cd1ff787940c3f2467de382a8697c9d/ts_flint-0.6.0-py3-none-any.whl
Building wheels for collected packages: ts
  Building wheel for ts (setup.py) ... [?25l[?25hdone
  Created wheel for ts: filename=ts-0.5.1-cp36-none-any.whl size=14366 sha256=fe2fc97e77b4f0170417687cfb99d73a34b0f93259c84698f6c126544824af64
  Stored in directory: /root/.cache/pip/wheels/21/63/72/2341c4dfe0ade30e93b7e2a924abb7149ee245384fd3b9a9e4
Successfully built ts
Installing collected packages: ts, ts-flint
Su

---
# Imports
---

In [0]:
#OS Funktionalität
import os
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-8-openjdk-amd64"
os.environ["SPARK_HOME"] = "/content/spark-2.4.5-bin-hadoop2.7"

#Spark & SparkSQL
import findspark

findspark.init()
from pyspark.sql import SparkSession
from pyspark import SparkContext
from pyspark.sql import SQLContext
from pyspark.sql.functions import input_file_name, col, collect_list, concat_ws, udf
from pyspark.sql.types import DoubleType, StringType
from pyspark.sql.types import StructType, StructField, DateType
from pyspark.sql.window import Window
from pyspark.sql.functions import concat, col, lit

#
#import org.apache.spark.sql.Encoders

#Initialisierung SparkSQL
spark = SparkSession.builder.master("local[*]").getOrCreate()
sc = spark.sparkContext
sqlContext = SQLContext(sc)



---
#Dataframe erzeugen
---

In [4]:
#schema der csv Datei festlegen; kann nicht abgeleitet werden, da Tab- (nicht Komma-) separiert
#PPO_RESULT umbenannt
schema = StructType([
                     StructField("filename", StringType(), True),
                     StructField("date", DateType(), True),
                     StructField("open", DoubleType(), True),
                     StructField("close", DoubleType(), True),
                     StructField("high", DoubleType(), True),
                     StructField("low", DoubleType(), True),
                     StructField("volume", DoubleType(), True),
                     StructField("IndEmpfehlung", StringType(), True)])

df_spark = spark.read.csv('trading_2019/Warehouse/Exportdaten_PPO/*.csv', schema = schema, sep = ',', header = True)
#df_spark.printSchema()
df_spark.show(12)
df_spark.count()

+--------+----------+--------------------+--------------------+--------------------+--------------------+--------------+-------------+
|filename|      date|                open|               close|                high|                 low|        volume|IndEmpfehlung|
+--------+----------+--------------------+--------------------+--------------------+--------------------+--------------+-------------+
|  omnusd|2019-06-26|              2.0501|              2.0501|              2.0501|              2.0501|   20.45383732|           V2|
|  btcusd|2019-06-26|             12876.0|             12876.0|             12894.0|             12876.0|   24.61357214|           H0|
|  dtabtc|2019-06-26|              1.1E-7|              1.1E-7|              1.1E-7|              1.1E-7|39921.60654521|           K1|
|  neojpy|2019-06-26|              2043.5|              2043.5|              2043.5|              2043.5|   10.91652872|           K1|
|  sngeth|2019-06-26|            4.407E-5|            4

192863


---
# Kurse vergleichen
---

In [0]:


#date_format(date, 'y-M-d') day,
df_spark.createOrReplaceTempView("rohdaten")
basis_daten = spark.sql(
  "SELECT filename, date, IndEmpfehlung, volume from rohdaten")
#basis_daten.show(5)
basis_daten.createOrReplaceTempView("basis_daten")


#test1 = spark.sql(
#  """SELECT filename, count(*) anzahl, min(date), max(date) FROM basis_daten WHERE date between '2019-08-01' and '2019-08-30'
#   GROUP BY filename
#   ORDER BY anzahl desc
#  """)
#test1.createOrReplaceTempView("test1")
#test1.show(5)

#Aus Performance Gründen werden hier 5 Kryptowährungen ausgewählt.
test2 = spark.sql("""SELECT * FROM basis_daten WHERE filename IN ('etheur','atobtc','btceur','pasusd','btcusd')""")
#test2.show()
#test2.count()
test2.createOrReplaceTempView("test2")



In [6]:
#Auswahl der benötigten Parameter
#Ebenfalls aus Perfomance Gründen wird sich hier auf einen Zeitraum von 30 Tagen beschränkt

vergleich = spark.sql("""SELECT a.IndEmpfehlung indA, b.IndEmpfehlung indB, a.filename nameA, a.date, b.date dateB, b.filename nameB, b.volume volumeB
    FROM basis_daten a JOIN basis_daten b
    ON (a.date = date_add(b.date, 1) AND a.filename <> b.filename)
    WHERE a.date BETWEEN '2019-08-01' AND '2019-08-30'
    ORDER BY nameA, date asc
    """)
vergleich.show(5)
vergleich.createOrReplaceTempView("vergleich")
vergleich.count()


+----+----+------+----------+----------+------+------------------+
|indA|indB| nameA|      date|     dateB| nameB|           volumeB|
+----+----+------+----------+----------+------+------------------+
|  H0|  H0|atobtc|2019-08-01|2019-07-31|etheur|        22.1075211|
|  H0|  H0|atobtc|2019-08-01|2019-07-31|btceur|        0.03733976|
|  H0|  H0|atobtc|2019-08-01|2019-07-31|pasusd|      806.83109001|
|  H0|  H0|atobtc|2019-08-01|2019-07-31|btcusd|        0.18294308|
|  H0|  H0|atobtc|2019-08-02|2019-08-01|btcusd|        0.20483681|
|  H0|  H0|atobtc|2019-08-02|2019-08-01|pasusd|          13999.98|
|  H0|  H0|atobtc|2019-08-02|2019-08-01|btceur|             0.005|
|  H0|  H0|atobtc|2019-08-02|2019-08-01|etheur|        3.45077406|
|  H0|  H0|atobtc|2019-08-03|2019-08-02|btceur|0.7109340000000001|
|  H0|  H0|atobtc|2019-08-03|2019-08-02|btcusd|             0.035|
|  H0|  K1|atobtc|2019-08-03|2019-08-02|pasusd|             799.0|
|  H0|  H0|atobtc|2019-08-03|2019-08-02|etheur|        10.0459

600

In [0]:
#Vergleich der beiden Signale und Bewertung
#Hilfsstrings um SQL statement übersichtlicher zu machen
#gleiche Indikatorklasse, d.h. beide Indikatoren in H, K oder V

sameClass = "CASE " \
"   WHEN (substring(indA,2,1) = substring(indB,2,1) ) THEN 0 " + "\n" \
"   WHEN (substring(indA,2,1) = '0' AND substring(indB,2,1) = '2') THEN 2 " + "\n" \
"   WHEN (substring(indA,2,1) = '1' AND substring(indB,2,1) = '3') THEN 2 " + "\n" \
"   WHEN (substring(indA,2,1) = '2' AND substring(indB,2,1) = '0') THEN 2 " + "\n" \
"   WHEN (substring(indA,2,1) = '3' AND substring(indB,2,1) = '1') THEN 2 " + "\n" \
"   WHEN (substring(indA,2,1) = '0' AND substring(indB,2,1) = '3') THEN 3 " + "\n" \
"   WHEN (substring(indA,2,1) = '3' AND substring(indB,2,1) = '0') THEN 3 " + "\n" \
"   ELSE 1  END" + "\n" 


#Indikatoren nur um eine Klasse verschoben, d.h. indA = H und indB = K || V usw. 
classP1 = "CASE " \
"   WHEN (substring(indA,2,1) = '0' ) THEN (CAST(substring(indB,2,1) AS INT) + 5) " + "\n" \
"   WHEN (substring(indA,2,1) = '1' ) THEN (CAST(substring(indB,2,1) AS INT) + 6) " + "\n" \
"   WHEN (substring(indA,2,1) = '2' ) THEN (CAST(substring(indB,2,1) AS INT) + 7) " + "\n" \
"   WHEN (substring(indA,2,1) = '3' ) THEN (CAST(substring(indB,2,1) AS INT) + 8) END" + "\n"

#Indikator gegensätzlich, d.h. indA = V und indB = K und umgekehrt
classP2 = "CASE " \
"   WHEN (substring(indA,2,1) = '0' ) THEN (CAST(substring(indB,2,1) AS INT) + 10) " + "\n" \
"   WHEN (substring(indA,2,1) = '1' ) THEN (CAST(substring(indB,2,1) AS INT) + 11) " + "\n" \
"   WHEN (substring(indA,2,1) = '2' ) THEN (CAST(substring(indB,2,1) AS INT) + 12) " + "\n" \
"   WHEN (substring(indA,2,1) = '3' ) THEN (CAST(substring(indB,2,1) AS INT) + 13) END" + "\n"

In [8]:
#Vergleich der Buchstaben
signalVergleich = "SELECT *," \
" CASE " \
"   WHEN (substring(indA,1,1) = substring(indB,1,1)) THEN " + sameClass + \
"   WHEN (substring(indA,1,1) = 'V' AND substring(indB,1,1) = 'K') " \
"   OR (substring(indA,1,1) = 'K' AND substring(indB,1,1) = 'V') THEN " + classP2 + \
"   ELSE " + classP1 + \
"   END AS signal_vergleich" \
" FROM vergleich"

testvgl = spark.sql(signalVergleich)
testvgl.show()
testvgl.createOrReplaceTempView("signalVergleich")

+----+----+------+----------+----------+------+------------------+----------------+
|indA|indB| nameA|      date|     dateB| nameB|           volumeB|signal_vergleich|
+----+----+------+----------+----------+------+------------------+----------------+
|  H0|  H0|atobtc|2019-08-01|2019-07-31|btceur|        0.03733976|               0|
|  H0|  H0|atobtc|2019-08-01|2019-07-31|etheur|        22.1075211|               0|
|  H0|  H0|atobtc|2019-08-01|2019-07-31|pasusd|      806.83109001|               0|
|  H0|  H0|atobtc|2019-08-01|2019-07-31|btcusd|        0.18294308|               0|
|  H0|  H0|atobtc|2019-08-02|2019-08-01|btcusd|        0.20483681|               0|
|  H0|  H0|atobtc|2019-08-02|2019-08-01|pasusd|          13999.98|               0|
|  H0|  H0|atobtc|2019-08-02|2019-08-01|btceur|             0.005|               0|
|  H0|  H0|atobtc|2019-08-02|2019-08-01|etheur|        3.45077406|               0|
|  H0|  H0|atobtc|2019-08-03|2019-08-02|etheur|        10.0459161|          

In [9]:
#Durchschnitt über die letzten drei Tage ermitteln
vergleich_average = "SELECT d.*, avg(signal_vergleich) over (PARTITION BY nameA, nameB order by date rows BETWEEN 2 PRECEDING AND CURRENT ROW) as Mittelwert " \
"FROM signalVergleich d"
vergleich_avg = spark.sql(vergleich_average)
vergleich_avg.show()
vergleich_avg.createOrReplaceTempView("vergleichAVG")

+----+----+------+----------+----------+------+------------------+----------------+------------------+
|indA|indB| nameA|      date|     dateB| nameB|           volumeB|signal_vergleich|        Mittelwert|
+----+----+------+----------+----------+------+------------------+----------------+------------------+
|  H0|  H0|atobtc|2019-08-01|2019-07-31|btcusd|        0.18294308|               0|               0.0|
|  H0|  H0|atobtc|2019-08-02|2019-08-01|btcusd|        0.20483681|               0|               0.0|
|  H0|  H0|atobtc|2019-08-03|2019-08-02|btcusd|             0.035|               0|               0.0|
|  K1|  H0|atobtc|2019-08-04|2019-08-03|btcusd|        1.24550528|               6|               2.0|
|  H0|  H0|atobtc|2019-08-05|2019-08-04|btcusd|        1.20738545|               0|               2.0|
|  H0|  K1|atobtc|2019-08-06|2019-08-05|btcusd|        0.96756822|               6|               4.0|
|  H0|  V2|atobtc|2019-08-07|2019-08-06|btcusd|2.2642137000000004|       

In [0]:
#Tests für SQL-Anfragen -> werden nicht benötigt für weiteren Verlauf
#t = spark.sql(
#    "SELECT d.nameA, d.date, MIN(Mittelwert) AS minimum, first(d.nameB) " \
#    "FROM vergleichAVG d " \
#    "GROUP BY d.nameA, d.date " \
#    "ORDER BY date, nameA"
#)
t = spark.sql(
    "SELECT DISTINCT d.*  " \
    "FROM vergleichAVG d  " \
    "WHERE (d.nameA, d.date, d.Mittelwert) IN ( " \
        "SELECT nameA, date, min(Mittelwert) " \
        "FROM vergleichAVG m  " \
        "GROUP BY nameA, date ) " \
#      ") e " \
#      "ON e.nameA = d.nameA AND e.date = d.date " \
#    "WHERE e.minimum = d.Mittelwert " \
    "ORDER BY date "
)
#t = spark.sql(
#    "SELECT a.* FROM vergleichAVG a " \
#    "WHERE (a.date, a.nameA, min(Mittelwert) AS Minimum) in (SELECT b.date, b.nameA, min(Mittelwert) AS Minimum FROM vergleichAVG b GROUP BY b.nameA, b.date)"
#)

In [10]:
#Auswahl der Minima für jeden Ausgangskurs
u = spark.sql(
    "SELECT * FROM ( " \
    "SELECT d.*, min(Mittelwert) over (PARTITION BY nameA, date) AS minimum " \
    "FROM vergleichAVG d " \
    "ORDER BY nameA, date ) " \
    "WHERE Mittelwert = minimum "
)
u.show()
u.createOrReplaceTempView("u")
u.count()

+----+----+------+----------+----------+------+-------------------+----------------+------------------+------------------+
|indA|indB| nameA|      date|     dateB| nameB|            volumeB|signal_vergleich|        Mittelwert|           minimum|
+----+----+------+----------+----------+------+-------------------+----------------+------------------+------------------+
|  H0|  H0|atobtc|2019-08-01|2019-07-31|btcusd|         0.18294308|               0|               0.0|               0.0|
|  H0|  H0|atobtc|2019-08-01|2019-07-31|pasusd|       806.83109001|               0|               0.0|               0.0|
|  H0|  H0|atobtc|2019-08-01|2019-07-31|etheur|         22.1075211|               0|               0.0|               0.0|
|  H0|  H0|atobtc|2019-08-01|2019-07-31|btceur|         0.03733976|               0|               0.0|               0.0|
|  H0|  H0|atobtc|2019-08-02|2019-08-01|pasusd|           13999.98|               0|               0.0|               0.0|
|  H0|  H0|atobt

302

In [12]:
#Falls mehrere Minima existieren -> Auswahl des Kurses mit dem höchsten Volumen
v = spark.sql(
    "SELECT * FROM ( " \
    "SELECT d.*, max(volumeB) over (PARTITION BY nameA, date) as maxVolume " \
    "FROM u d " \
    "ORDER BY nameA, date ) " \
    "WHERE volumeB = maxVolume "
)
v.show()
v.createOrReplaceTempView("v")
#v.count()

+----+----+------+----------+----------+------+------------------+----------------+------------------+------------------+------------------+
|indA|indB| nameA|      date|     dateB| nameB|           volumeB|signal_vergleich|        Mittelwert|           minimum|         maxVolume|
+----+----+------+----------+----------+------+------------------+----------------+------------------+------------------+------------------+
|  H0|  H0|atobtc|2019-08-01|2019-07-31|pasusd|      806.83109001|               0|               0.0|               0.0|      806.83109001|
|  H0|  H0|atobtc|2019-08-02|2019-08-01|pasusd|          13999.98|               0|               0.0|               0.0|          13999.98|
|  H0|  H0|atobtc|2019-08-03|2019-08-02|etheur|        10.0459161|               0|               0.0|               0.0|        10.0459161|
|  K1|  H0|atobtc|2019-08-04|2019-08-03|etheur|              2.21|               6|               2.0|               2.0|              2.21|
|  H0|  H0|at

---
# Auswertung
---

In [13]:
w = spark.sql(
    "SELECT *,  " \
    "CASE " \
    "   WHEN (minimum < 5.0) THEN nameB " \
    "   ELSE 'Folgt keinem Kurs' " \
    "END AS Folgt_Kurs " \
    "FROM v"
)
w.show()
w.createOrReplaceTempView("w")
w.count()

+----+----+------+----------+----------+------+------------------+----------------+------------------+------------------+------------------+----------+
|indA|indB| nameA|      date|     dateB| nameB|           volumeB|signal_vergleich|        Mittelwert|           minimum|         maxVolume|Folgt_Kurs|
+----+----+------+----------+----------+------+------------------+----------------+------------------+------------------+------------------+----------+
|  H0|  H0|atobtc|2019-08-01|2019-07-31|pasusd|      806.83109001|               0|               0.0|               0.0|      806.83109001|    pasusd|
|  H0|  H0|atobtc|2019-08-02|2019-08-01|pasusd|          13999.98|               0|               0.0|               0.0|          13999.98|    pasusd|
|  H0|  H0|atobtc|2019-08-03|2019-08-02|etheur|        10.0459161|               0|               0.0|               0.0|        10.0459161|    etheur|
|  K1|  H0|atobtc|2019-08-04|2019-08-03|etheur|              2.21|               6|     

150

In [14]:
x = spark.sql(
    "SELECT a.*, b.IndEmpfehlung indE " \
    "FROM w a LEFT JOIN rohdaten b " \
    "ON (a.Folgt_Kurs = b.filename AND a.date = b.date) "
)
#x.show()
x.createOrReplaceTempView("x")
x.count()


+----+----+------+----------+----------+------+------------------+----------------+------------------+------------------+------------------+-----------------+----+
|indA|indB| nameA|      date|     dateB| nameB|           volumeB|signal_vergleich|        Mittelwert|           minimum|         maxVolume|       Folgt_Kurs|indE|
+----+----+------+----------+----------+------+------------------+----------------+------------------+------------------+------------------+-----------------+----+
|  H0|  H0|atobtc|2019-08-01|2019-07-31|pasusd|      806.83109001|               0|               0.0|               0.0|      806.83109001|           pasusd|  H0|
|  H0|  H0|atobtc|2019-08-02|2019-08-01|pasusd|          13999.98|               0|               0.0|               0.0|          13999.98|           pasusd|  K1|
|  H0|  H0|atobtc|2019-08-03|2019-08-02|etheur|        10.0459161|               0|               0.0|               0.0|        10.0459161|           etheur|  H0|
|  K1|  H0|atobt

150

In [31]:
#Empfehlung aus dem Vergleich der beiden IndikatorEmpfehlungen ermitteln
empfehlung = spark.sql (
    "SELECT *, " \
    " CASE " \
    "   WHEN indE IS NULL THEN indA " \
    "   WHEN (substring(indA,1,1) = substring(indE,1,1)) THEN concat(substring(indA,1,1), '3') " \
    "   WHEN (substring(indA,1,1) = 'V' AND substring(indE,1,1) = 'K') " \
    "   OR (substring(indA,1,1) = 'K' AND substring(indE,1,1) = 'V') THEN 'H0' " \
    "   ELSE concat(substring(indE,1,1), '0') " \
    "   END AS Empfehlung" \
    " FROM x"
)
#empfehlung.show()
empfehlung.createOrReplaceTempView("empfehlung")
empfehlung.count()

+----+----+------+----------+----------+------+------------------+----------------+------------------+------------------+------------------+----------+----+----------+
|indA|indB| nameA|      date|     dateB| nameB|           volumeB|signal_vergleich|        Mittelwert|           minimum|         maxVolume|Folgt_Kurs|indE|Empfehlung|
+----+----+------+----------+----------+------+------------------+----------------+------------------+------------------+------------------+----------+----+----------+
|  H0|  H0|atobtc|2019-08-01|2019-07-31|pasusd|      806.83109001|               0|               0.0|               0.0|      806.83109001|    pasusd|  H0|        H3|
|  H0|  H0|atobtc|2019-08-02|2019-08-01|pasusd|          13999.98|               0|               0.0|               0.0|          13999.98|    pasusd|  K1|        K0|
|  H0|  H0|atobtc|2019-08-03|2019-08-02|etheur|        10.0459161|               0|               0.0|               0.0|        10.0459161|    etheur|  H0|    

150

In [32]:
final = spark.sql(
    "SELECT a.filename, a.date, a.open, a.close, a.high, a.low, a.volume, b.Empfehlung " \
    "FROM empfehlung b LEFT JOIN rohdaten a " \
    "ON (a.filename = b.nameA AND a.date = b.date) " \
    "ORDER BY filename, date asc"
)
#final.show()
final.createOrReplaceTempView("finalTable")
final.count()

+--------+----------+--------------------+--------------------+--------------------+--------------------+-------------------+----------+
|filename|      date|                open|               close|                high|                 low|             volume|Empfehlung|
+--------+----------+--------------------+--------------------+--------------------+--------------------+-------------------+----------+
|  atobtc|2019-08-01|           3.4799E-4|           3.4799E-4|           3.4799E-4|           3.4799E-4|         8.46446851|        H3|
|  atobtc|2019-08-02|           3.4508E-4|           3.4508E-4|           3.4508E-4|           3.4508E-4| 171.70999999999995|        K0|
|  atobtc|2019-08-03|            3.448E-4|            3.448E-4|            3.448E-4|            3.448E-4|         2.90873186|        H3|
|  atobtc|2019-08-04|            3.425E-4|            3.425E-4|            3.425E-4|            3.425E-4|              41.38|        K3|
|  atobtc|2019-08-05|           3.1854E-4

150

---
# Export
---

In [33]:
# benötigte Imports für den Export

!apt-get install tree
# Für File-Download / Drive-Mount
from google.colab import files, drive

# Um Zip-Datein zu erstellen
import shutil

# Für mergen
from glob import glob


from pyspark.sql.functions import date_format

import numpy
from numpy import array

Reading package lists... Done
Building dependency tree       
Reading state information... Done
The following package was automatically installed and is no longer required:
  libnvidia-common-430
Use 'apt autoremove' to remove it.
The following NEW packages will be installed:
  tree
0 upgraded, 1 newly installed, 0 to remove and 25 not upgraded.
Need to get 40.7 kB of archives.
After this operation, 105 kB of additional disk space will be used.
Get:1 http://archive.ubuntu.com/ubuntu bionic/universe amd64 tree amd64 1.7.0-5 [40.7 kB]
Fetched 40.7 kB in 0s (107 kB/s)
Selecting previously unselected package tree.
(Reading database ... 134582 files and directories currently installed.)
Preparing to unpack .../tree_1.7.0-5_amd64.deb ...
Unpacking tree (1.7.0-5) ...
Setting up tree (1.7.0-5) ...
Processing triggers for man-db (2.8.3-2ubuntu0.1) ...


In [34]:
# GDrive Mounten:
if not os.path.exists('drive'):
  os.mkdir('drive')
drive.mount('drive')
mainordner_Pfad = os.path.join('drive', 'My Drive', 'ABDA2019')

# Alten Ordner ggf. löschen:
if os.path.exists(os.path.join(mainordner_Pfad, 'warehouse', 'kursvergleich')):
#  if not Mount_GDrive:
#    print("Vorhandener Datensatz entfernt.")
    shutil.rmtree(os.path.join(mainordner_Pfad, 'warehouse', 'kursvergleich'))

# Ordner-Checks (ggf. Erstellen der Ordner):
if not os.path.exists(mainordner_Pfad):
  os.mkdir(mainordner_Pfad)

if not os.path.exists(os.path.join(mainordner_Pfad, 'warehouse')):
  os.mkdir(os.path.join(mainordner_Pfad, 'warehouse'))

if not os.path.exists(os.path.join(mainordner_Pfad, 'warehouse', 'kursvergleich')):
  os.mkdir(os.path.join(mainordner_Pfad, 'warehouse', 'kursvergleich'))

Go to this URL in a browser: https://accounts.google.com/o/oauth2/auth?client_id=947318989803-6bn6qk8qdgf4n4g3pfee6491hc0brc4i.apps.googleusercontent.com&redirect_uri=urn%3aietf%3awg%3aoauth%3a2.0%3aoob&response_type=code&scope=email%20https%3a%2f%2fwww.googleapis.com%2fauth%2fdocs.test%20https%3a%2f%2fwww.googleapis.com%2fauth%2fdrive%20https%3a%2f%2fwww.googleapis.com%2fauth%2fdrive.photos.readonly%20https%3a%2f%2fwww.googleapis.com%2fauth%2fpeopleapi.readonly

Enter your authorization code:
··········
Mounted at drive


In [35]:

exportData = spark.sql(
    "SELECT filename, Date(date) date, open, close, high, low, volume, Empfehlung " \
    "FROM finalTable "
).withColumn("date", date_format(col("date"), "yyyy-MM-dd"))
exportData.collect()



[Row(filename='atobtc', date='2019-08-01', open=0.00034799, close=0.00034799, high=0.00034799, low=0.00034799, volume=8.46446851, Empfehlung='H3'),
 Row(filename='atobtc', date='2019-08-02', open=0.00034508, close=0.00034508, high=0.00034508, low=0.00034508, volume=171.70999999999995, Empfehlung='K0'),
 Row(filename='atobtc', date='2019-08-03', open=0.0003448, close=0.0003448, high=0.0003448, low=0.0003448, volume=2.90873186, Empfehlung='H3'),
 Row(filename='atobtc', date='2019-08-04', open=0.0003425, close=0.0003425, high=0.0003425, low=0.0003425, volume=41.38, Empfehlung='K3'),
 Row(filename='atobtc', date='2019-08-05', open=0.00031854, close=0.00031854, high=0.00031854, low=0.00031854, volume=0.00395176, Empfehlung='K0'),
 Row(filename='atobtc', date='2019-08-06', open=0.00030939, close=0.00030939, high=0.00030939, low=0.00030939, volume=11.51851971, Empfehlung='H3'),
 Row(filename='atobtc', date='2019-08-07', open=0.00029679, close=0.00029679, high=0.00029679, low=0.00029679, volum

In [0]:
exportDays = exportData.select(exportData.date).distinct()
exportDaysAr = array([row.date for row in exportDays.collect()])

In [0]:
for date in exportDaysAr:
  currentPath = os.path.join(mainordner_Pfad,'warehouse','kursvergleich', date)

  # Falls der Ordner im Aufbau war:
  temporaryPath = os.path.join(currentPath, '_temporary')
  if os.path.exists(temporaryPath):
    shutil.rmtree(currentPath)

  # Nur Schreiben wenn noch nicht vorhanden:
  if not os.path.exists(currentPath):
    # Output daten
    currentData = exportData.where(exportData.date == date)
    currentData.collect();

  currentData.write.csv(currentPath, mode = 'overwrite', encoding = 'utf-8', header = True )


In [0]:
# Mergen

for date in exportDaysAr:
  path = os.path.join(mainordner_Pfad, 'warehouse', 'kursvergleich', date)
  filename = os.path.join(mainordner_Pfad, 'warehouse', 'kursvergleich', (date + '.csv'))

  if os.path.exists(path) and not os.path.exists(filename):
    with open(filename, 'a') as singleFile:
      first_csv = True
      for csv in glob((path + '/*.csv')):
        if csv == filename:
          pass
        else:
          header = True

          for line in open(csv, 'r'):
            if first_csv and header:
              singleFile.write(line)
              first_csv = False
              header = False
            elif header:
              header = False
            else:
              singleFile.write(line)
      
      singleFile.close()

      if os.path.exists(path):
        shutil.rmtree(path)
      

In [0]:
#Download


shutil.make_archive('Exportdaten', 'zip', os.path.join(mainordner_Pfad, 'warehouse', 'kursvergleich'))
files.download('/content/Exportdaten.zip')