In [1]:
import numpy as np

In [2]:


def sigmoid(x):
    return 1 / (1 + np.exp(-x))

def tanh(x):
    return np.tanh(x)

def sigmoidDerivative(x):
    s = sigmoid(x)
    return s * (1 - s)

def tanhDerivative(x):
    return 1 - np.tanh(x)**2

In [3]:
class LSTM:
    def __init__(self, inputSize, hiddenSize, outputSize, learningRate=0.001):
        self.inputSize = inputSize
        self.hiddenSize = hiddenSize
        self.outputSize = outputSize
        self.learningRate = learningRate

        self.weightForget = np.random.randn(hiddenSize, inputSize + hiddenSize) * 0.01
        self.biasForget = np.zeros((hiddenSize, 1))

        self.weightInput = np.random.randn(hiddenSize, inputSize + hiddenSize) * 0.01
        self.biasInput = np.zeros((hiddenSize, 1))

        self.weightCandidate = np.random.randn(hiddenSize, inputSize + hiddenSize) * 0.01
        self.biasCandidate = np.zeros((hiddenSize, 1))

        self.weightOutput = np.random.randn(hiddenSize, inputSize + hiddenSize) * 0.01
        self.biasOutput = np.zeros((hiddenSize, 1))

        self.weightY = np.random.randn(outputSize, hiddenSize) * 0.01
        self.biasY = np.zeros((outputSize, 1))

    def forward(self, inputs):
        inputVectors, hiddenStates, cellStates, outputs = {}, {}, {}, {}
        forgetGates, inputGates, candidateValues, outputGates = {}, {}, {}, {}

        hiddenStates[-1] = np.zeros((self.hiddenSize, 1))
        cellStates[-1] = np.zeros((self.hiddenSize, 1))

        for t in range(len(inputs)):
            inputVectors[t] = np.zeros((self.inputSize, 1))
            inputVectors[t][inputs[t]] = 1

            concat = np.vstack((hiddenStates[t-1], inputVectors[t]))

            forgetGates[t] = sigmoid(np.dot(self.weightForget, concat) + self.biasForget)
            inputGates[t] = sigmoid(np.dot(self.weightInput, concat) + self.biasInput)
            candidateValues[t] = tanh(np.dot(self.weightCandidate, concat) + self.biasCandidate)
            cellStates[t] = forgetGates[t] * cellStates[t-1] + inputGates[t] * candidateValues[t]
            outputGates[t] = sigmoid(np.dot(self.weightOutput, concat) + self.biasOutput)
            hiddenStates[t] = outputGates[t] * tanh(cellStates[t])
            outputs[t] = np.dot(self.weightY, hiddenStates[t]) + self.biasY

        self.cache = (inputVectors, hiddenStates, cellStates, outputs, forgetGates, inputGates, candidateValues, outputGates)
        return outputs
    
    def train(self, data, targets, epochs=100):
        for epoch in range(epochs):
            loss = 0
            self.forward(data)
            self.backward(targets)

            inputVectors, hiddenStates, cellStates, outputs, forgetGates, inputGates, candidateValues, outputGates = self.cache
            for t in range(len(targets)):
                expO = np.exp(outputs[t])
                probs = expO / np.sum(expO)
                loss += -np.log(probs[targets[t], 0])

            if epoch % 10 == 0:
                print(f"Epoch {epoch}, Loss: {loss}")

    def predict(self, seedIdx, nChars):
        hidden = np.zeros((self.hiddenSize, 1))
        cell = np.zeros((self.hiddenSize, 1))
        inputVec = np.zeros((self.inputSize, 1))
        inputVec[seedIdx] = 1
        indices = []

        for t in range(nChars):
            concat = np.vstack((hidden, inputVec))
            forgetGate = sigmoid(np.dot(self.weightForget, concat) + self.biasForget)
            inputGate = sigmoid(np.dot(self.weightInput, concat) + self.biasInput)
            candidateValue = tanh(np.dot(self.weightCandidate, concat) + self.biasCandidate)
            cell = forgetGate * cell + inputGate * candidateValue
            outputGate = sigmoid(np.dot(self.weightOutput, concat) + self.biasOutput)
            hidden = outputGate * tanh(cell)
            output = np.dot(self.weightY, hidden) + self.biasY

            probs = np.exp(output) / np.sum(np.exp(output))
            idx = np.random.choice(range(self.outputSize), p=probs.ravel())

            inputVec = np.zeros((self.inputSize, 1))
            inputVec[idx] = 1
            indices.append(idx)

        return indices

    def backward(self, targets):
        inputVectors, hiddenStates, cellStates, outputs, forgetGates, inputGates, candidateValues, outputGates = self.cache

        dWeightForget = np.zeros_like(self.weightForget)
        dWeightInput = np.zeros_like(self.weightInput)
        dWeightCandidate = np.zeros_like(self.weightCandidate)
        dWeightOutput = np.zeros_like(self.weightOutput)
        dWeightY = np.zeros_like(self.weightY)
        dBiasForget = np.zeros_like(self.biasForget)
        dBiasInput = np.zeros_like(self.biasInput)
        dBiasCandidate = np.zeros_like(self.biasCandidate)
        dBiasOutput = np.zeros_like(self.biasOutput)
        dBiasY = np.zeros_like(self.biasY)

        dHiddenNext = np.zeros_like(hiddenStates[0])
        dCellNext = np.zeros_like(cellStates[0])

        for t in reversed(range(len(targets))):
            concat = np.vstack((hiddenStates[t-1], inputVectors[t]))

            expO = np.exp(outputs[t])
            probs = expO / np.sum(expO)
            dOutput = probs.copy()
            dOutput[targets[t]] -= 1

            dWeightY += np.dot(dOutput, hiddenStates[t].T)
            dBiasY += dOutput

            dHidden = np.dot(self.weightY.T, dOutput) + dHiddenNext

            tanhCell = np.tanh(cellStates[t])
            dOutputGate = dHidden * tanhCell
            dCell = dHidden * outputGates[t] * (1 - tanhCell ** 2) + dCellNext

            dForgetGate = dCell * cellStates[t-1]
            dInputGate = dCell * candidateValues[t]
            dCandidateValue = dCell * inputGates[t]
            dOutputGateRaw = dOutputGate * outputGates[t] * (1 - outputGates[t])
            dForgetGateRaw = dForgetGate * forgetGates[t] * (1 - forgetGates[t])
            dInputGateRaw = dInputGate * inputGates[t] * (1 - inputGates[t])
            dCandidateValueRaw = dCandidateValue * (1 - candidateValues[t] ** 2)

            dWeightForget += np.dot(dForgetGateRaw, concat.T)
            dWeightInput += np.dot(dInputGateRaw, concat.T)
            dWeightCandidate += np.dot(dCandidateValueRaw, concat.T)
            dWeightOutput += np.dot(dOutputGateRaw, concat.T)
            dBiasForget += dForgetGateRaw
            dBiasInput += dInputGateRaw
            dBiasCandidate += dCandidateValueRaw
            dBiasOutput += dOutputGateRaw

            dConcat = np.dot(self.weightForget.T, dForgetGateRaw) + np.dot(self.weightInput.T, dInputGateRaw) + np.dot(self.weightCandidate.T, dCandidateValueRaw) + np.dot(self.weightOutput.T, dOutputGateRaw)

            dHiddenNext = dConcat[:self.hiddenSize]
            dCellNext = dCell * forgetGates[t]

        for param, dparam in zip([self.weightForget, self.weightInput, self.weightCandidate, self.weightOutput, self.weightY,
                                  self.biasForget, self.biasInput, self.biasCandidate, self.biasOutput, self.biasY],
                                [dWeightForget, dWeightInput, dWeightCandidate, dWeightOutput, dWeightY,
                                 dBiasForget, dBiasInput, dBiasCandidate, dBiasOutput, dBiasY]):
            np.clip(dparam, -5, 5, out=dparam)
            param -= self.learningRate * dparam

In [4]:
import pandas as pd

df = pd.read_csv("custdata.csv")

df = df.drop(['invoice_no', 'customer_id', 'category'], axis=1)
df['sales'] = df['quantity'] * df['price']
df['invoice_date'] = pd.to_datetime(df['invoice_date'], format='%d/%m/%Y')

dailySales = df.groupby(['invoice_date', 'shopping_mall'])['sales'].sum().reset_index()

malls = ['Kanyon', 'Forum Istanbul']
numBins = 10

for mall in malls:
    mallData = dailySales[dailySales['shopping_mall'] == mall].sort_values('invoice_date')
    sales = mallData['sales'].values
    
    salesMin, salesMax = sales.min(), sales.max()
    binEdges = np.linspace(salesMin, salesMax, numBins + 1)
    
    def getBin(value):
        return min(max(np.digitize(value, binEdges) - 1, 0), numBins - 1)
    
    sequence = [getBin(s) for s in sales]
    
    n = len(sequence)
    trainEnd = n // 2
    trainSeq = sequence[:trainEnd]
    
    data = trainSeq[:-1]
    targets = trainSeq[1:]
    
    model = LSTM(numBins, 128, numBins, 0.001)
    model.train(data, targets, epochs=10)
    
    predLength = n - trainEnd
    predictedBins = model.predict(trainSeq[-1], predLength)
    
    def binToSales(b):
        return (binEdges[b] + binEdges[b + 1]) / 2
    
    predictedSales = [binToSales(b) for b in predictedBins]
    actualSales = sales[trainEnd:]
    
    import matplotlib.pyplot as plt

    dates = mallData['invoice_date'].values
    plt.figure()
    plt.plot(dates, sales)
    plt.plot(dates[trainEnd:], predictedSales)
    plt.show()

Epoch 0, Loss: 914.1107098053038

Trend for Kanyon:
     invoice_date     sales
4      2021-01-01  58495.73
14     2021-01-02  56415.39
24     2021-01-03  81463.92
34     2021-01-04  75300.34
44     2021-01-05  50942.92
...           ...       ...
7917   2023-03-04  94853.13
7927   2023-03-05  43080.33
7937   2023-03-06  93314.76
7947   2023-03-07  22907.25
7957   2023-03-08  83303.46

[797 rows x 2 columns]
Predicted: [np.float64(28358.835499999997), np.float64(109783.61749999998), np.float64(28358.835499999997), np.float64(123354.41449999997), np.float64(14788.038499999999), np.float64(96212.82049999997), np.float64(55500.429499999984), np.float64(96212.82049999997), np.float64(41929.63249999999), np.float64(109783.61749999998), np.float64(69071.22649999999), np.float64(14788.038499999999), np.float64(28358.835499999997), np.float64(82642.02349999998), np.float64(136925.21149999998), np.float64(96212.82049999997), np.float64(69071.22649999999), np.float64(55500.429499999984), np.floa