In [1]:
%config IPCompleter.greedy=True

In [2]:
import pandas as pd
import numpy as np
from pathlib import Path
import os
from threading import Thread
from queue import Queue
import multiprocessing

In [3]:
xcelLocation = "../xcels/"
!export xcelLocation="../xcels/"
HEADER = ["symbol", "name", "amount", "volume", "value", "lastday", "openning", "last-value",
 "last-change", "last-percent", "ending-value", "ending-change", "ending-percent",
 "min", "max",]
HEADER_extra = HEADER + ["year", "month", "day"]

In [4]:
!ls $xcelLocation | grep ".xlsx" > xlFiles
tmp = !cat xlFiles
names = [name[:-5] for name in tmp]

In [5]:
def cleaner():
    for name in names:
        if os.path.getsize(xcelLocation + name + '.xlsx') < 10000:
            os.remove(xcelLocation + name + '.xlsx')
cleaner()

In [6]:
def convert(xcelLocation, xlFileName, returnAllMode=False):
    xl = None
    try:
        if (not Path(xcelLocation + xlFileName + '.csv').is_file()) or returnAllMode:
            xl = pd.read_excel(xcelLocation + xlFileName + ".xlsx",
                               header=[0], skiprows=[0,1])
            xl.columns = HEADER
            xl.to_csv(xcelLocation + xlFileName + '.csv', encoding='utf-8', index=False, header=HEADER)
    except:
        xl = str(xlFileName)
    finally:
        return xl

In [7]:
def convertThread(threadname, q, qDFs, qErrors):
    while not q.empty():
        fileNames = q.get()
        q.task_done()
        for name in fileNames:
            tmp = convert(xcelLocation=xcelLocation, xlFileName=name, returnAllMode=True)
            if isinstance(tmp, str):
                qErrors.put(tmp)
            else:
                qDFs.put((tmp.copy(), name))
    print(str(threadname) + " done")


In [8]:
allDFs = []
allDFNames = []
batchSize = 10
i = 0
numThread = 16
workers = []

pool = multiprocessing.Pool(processes=numThread)
m = multiprocessing.Manager()
queue = m.Queue()
qDFs = m.Queue()
qErrors = m.Queue()

while i*batchSize < len(names):
    if (i+1)*batchSize < len(names):
        queue.put(names[i*batchSize:(i+1)*batchSize])
    else:
        queue.put(names[i*10:])
    i+=1
print(len(names))
print(queue.qsize())

for i in range(numThread):
#     workers.append(Thread(target=readThread, args=("Thread-" + str(i), queue, qsum, qcount)))
#     workers.append(pool.apply_async(readThread, ("Thread-" + str(i), queue, qsum, qcount,)))
    workers.append(multiprocessing.Process(target=convertThread, args=("Thread-" + str(i),
                                                                    queue, qDFs, qErrors)))
    workers[i].start()

for i in range(numThread):
    workers[i].join()

while not qDFs.empty():
    dftmp, nametmp = qDFs.get()
    allDFs.append(dftmp)
    allDFNames.append(nametmp)

4411
442
Thread-1 done
Thread-5 done
Thread-0 done
Thread-7 done
Thread-15 done
Thread-2 done
Thread-8 done
Thread-9 done
Thread-4 done
Thread-3 done
Thread-12 done
Thread-10 done
Thread-13 done
Thread-11 done
Thread-6 done
Thread-14 done


In [9]:
print(len(allDFs))
def makeMasterTable(chunkSize):
    for index, df in enumerate(allDFs):
        year, month, day = allDFNames[index].split("-")
        yearlist = np.full(len(df), year).tolist()
        monthlist = np.full(len(df), month).tolist()
        daylist = np.full(len(df), day).tolist()
        df["year"] = yearlist
        df["month"] = monthlist
        df["day"] = daylist
    xl = pd.concat(allDFs, keys=allDFNames, ignore_index=True)
    xl.columns = HEADER_extra
    xl = xl.astype({"year": int, "month": int, "day": int})
    print(xl.dtypes)
    xl.sort_values(by=['year', 'month', 'day'], inplace=True)
    xl.reset_index(drop=True, inplace=True)
    i = 0
    while i*chunkSize < len(xl):
        if (i+1)*chunkSize < len(xl):
            df_i = xl.iloc[i*chunkSize:(i+1)*chunkSize]
        else:
            df_i = xl.iloc[i*chunkSize:]
        df_i.to_csv('{xcelLocation}master{i}.csv'.format(i=i, xcelLocation=xcelLocation),
                    header=HEADER_extra, encoding='utf-8', index=False)
        i += 1
    return xl
allDF = makeMasterTable(1000000)

4227
symbol             object
name               object
amount              int64
volume              int64
value               int64
lastday           float64
openning          float64
last-value        float64
last-change       float64
last-percent      float64
ending-value      float64
ending-change     float64
ending-percent    float64
min               float64
max               float64
year                int64
month               int64
day                 int64
dtype: object


In [10]:
allDF.loc[allDF["symbol"]=="خپارس"]

Unnamed: 0,symbol,name,amount,volume,value,lastday,openning,last-value,last-change,last-percent,ending-value,ending-change,ending-percent,min,max,year,month,day
46771,خپارس,پارس‌ خودرو,318,561712,3608122553,6000.0,6000.0,6621.0,621.0,10.35,6621.0,621.0,10.35,4000.0,10000.0,1382,2,17
46907,خپارس,پارس‌ خودرو,94,124799,910907901,6952.0,7299.0,7299.0,347.0,4.99,7299.0,347.0,4.99,7299.0,7299.0,1382,2,21
47049,خپارس,پارس‌ خودرو,139,179535,1375776705,7299.0,7663.0,7663.0,364.0,4.99,7663.0,364.0,4.99,7663.0,7663.0,1382,2,22
47200,خپارس,پارس‌ خودرو,155,221434,1781657964,7663.0,8046.0,8046.0,383.0,5.00,8046.0,383.0,5.00,8046.0,8046.0,1382,2,23
47355,خپارس,پارس‌ خودرو,104,140655,1188253440,8046.0,8448.0,8448.0,402.0,5.00,8448.0,402.0,5.00,8448.0,8448.0,1382,2,24
47505,خپارس,پارس‌ خودرو,417,786860,6773780046,8448.0,8450.0,8444.0,-4.0,-0.05,8444.0,-4.0,-0.05,8402.0,8870.0,1382,2,27
47653,خپارس,پارس‌ خودرو,196,496179,3983067223,8444.0,8298.0,8022.0,-422.0,-5.00,8022.0,-422.0,-5.00,8022.0,8395.0,1382,2,28
47786,خپارس,پارس‌ خودرو,310,531907,4134590656,8022.0,8000.0,8020.0,-2.0,-0.02,8020.0,-2.0,-0.02,7621.0,8020.0,1382,2,30
47931,خپارس,پارس‌ خودرو,127,184518,1538550525,8020.0,8421.0,8421.0,401.0,5.00,8421.0,401.0,5.00,8020.0,8421.0,1382,2,31
48073,خپارس,پارس‌ خودرو,115,124564,1065380072,8421.0,8840.0,8397.0,-24.0,-0.29,8397.0,-24.0,-0.29,8159.0,8842.0,1382,3,3
