In [2]:
%run Constant.ipynb
%run DataSource.ipynb

class Metric:
            
    # symbol=SZ#002335
    def __init__(self, symbol, level, short, mid):
        self.dataSource = DataSource(symbol,level)

        self.short = short
        self.mid = mid
            
        self.data = self.dataSource.get_data()
       
    
    def next_series(self,cur_date, latest_n):
        cur_index = self.data[self.data['date']==cur_date].index.values[0]
        end_index = cur_index.astype(int)+latest_n

        part = self.data[cur_index:end_index]
        return part

        
    def previous_series(self,cur_date,latest_n):
        cur_index = self.data[self.data['date']==cur_date].index.values[0]
        start_index = cur_index.astype(int)-latest_n+1

        latest_part = self.data[start_index:cur_index+1]
            
        return latest_part
        
    def ma(self,cur_date,periods):
        periods_part = self.previous_series(cur_date, periods)['close']

        return round(periods_part.mean(),2)
    
    def std(self,cur_date, periods):
        periods_part = self.previous_series(cur_date, periods)['close']
        return np.std(periods_part)


    # {参数 N: 2，250，20 }
    # BOLL:MA(CLOSE,M); UB:BOLL+2*STD(CLOSE,M); LB:BOLL-2*STD(CLOSE,M);
    def boll(self,cur_date):
        M=20
        price = self.data[self.data['date']==cur_date]['close'].values[0]

        boll = self.ma(cur_date, M)
        up_boll = boll + 2 * (self.std(cur_date,M))
        low_boll = boll - 2 * (self.std(cur_date,M))
        return boll

    
    def is_down_boll(self, cur_date):
        boll_value = self.boll(cur_date)

        price = self.data[self.data['date']==cur_date]['close'].values[0]

        if(price<boll_value):
            return True
        else:
            return False
        

    def is_on_boll(self,cur_date):
        boll_value = self.boll(cur_date)

        price = self.data[self.data['date']==cur_date]['close'].values[0]

        if(price>boll_value):
            return True
        else:
            return False
        
        
    def is_down_cross(self,cur_date):
            cur_ma_short = self.ma(cur_date,self.short)
            cur_ma_mid = self.ma(cur_date,self.mid)

            previous_part = self.previous_series(cur_date,2)
            if(len(previous_part)==0):
                return False

            last_day = previous_part['date'].values[0]

            last_day_ma_short = self.ma(last_day,self.short)
            last_day_ma_mid = self.ma(last_day,self.mid)

            if((cur_ma_short<cur_ma_mid) and (last_day_ma_short>=last_day_ma_mid)):
                return True
            else:
                return False
        
    def is_up_cross(self,cur_date):
        cur_ma_short = self.ma(cur_date,self.short)
        cur_ma_mid = self.ma(cur_date,self.mid)
  
        previous_part = self.previous_series(cur_date,2)
        
        if(len(previous_part)==0):
            return False
        
        last_day = previous_part['date'].values[0]
        
        last_day_ma_short = self.ma(last_day,self.short)
        last_day_ma_mid = self.ma(last_day,self.mid)
        
        if((cur_ma_short>cur_ma_mid) and (last_day_ma_short<=last_day_ma_mid)):
            return True
        else:
            return False

    def k_on_ma(self,cur_date, latest_n_k=3):
        latest_n_k_close = self.previous_series(cur_date,latest_n_k)['close']
        flag=True

        for c in latest_n_k_close:
            if(c<self.ma(cur_date,short)):
                return False

        return flag


    def latest_n_period_up_crossed(self,cur_date, latest_n=3):
        latest_part = self.previous_series(cur_date, latest_n)['date'].values

        cross_array = []
        for i in range(0,latest_n):
            date = latest_part[i]
            up_cross = self.is_up_cross(date, short, mid)

            if(up_cross == True):
                return True

        return False


    def latest_n_k_break_ma(self,cur_date, latest_n_k=3):
        latest_n_k_close = self.previous_series(cur_date,latest_n_k)['close']

        flag=True

        for c in latest_n_k_close:
            if(c>self.ma(cur_date, self.mid)):
                return False

        if(debug):
            print(cur_date + ' ' + str(flag))
        return flag
    
    def deep_k_break_ma(self,cur_date, deep_percent=3):
        price = self.data[self.data['date']==cur_date]['close'].values[0]
        ma_price = self.ma(cur_date, self.mid)
        if((ma_price-price)*100/price>deep_percent):
            return True

        return False
    
    def get_cur_price(self, cur_date):
        price = self.data[self.data['date']==cur_date]['close'].values[0]
        return price

    def get_vol(self,cur_date):
        vol = self.data[self.data['date']==cur_date]['volume'].values[0]
        return vol

    def ma_vol(self,cur_date, periods=60):
        periods_part = self.previous_series(cur_date, periods)['volume']
        return periods_part.mean()

    def is_vol_outburst_point(self,cur_date, threshold=2, periods=60):
        vol = self.get_vol(cur_date)
        ma_vol_num = self.ma_vol(cur_date, periods)
        vol_rate = vol/ ma_vol_num

        if(vol_rate>threshold):
            return True
        else:
            return False

    # LARGE_VOL = SUM(VOL,5)/5/MA(VOL,60)    
    def is_latest_n_vol_outburst(self,cur_date, latest_n=5, threshold=2, periods=60):
        latest_part = self.previous_series(cur_date,latest_n)

        sum_vol = sum(latest_part['volume'])

        vol_rate = sum_vol/latest_n/self.ma_vol(cur_date, periods)

        if(vol_rate>threshold):
            return True
        else:
            return False


    # LARGE_VOL:=SUM(VOL,5)/5/MA(VOL,60)
    # COUNT(LARGE_VOL>2,10)>5
    # 成交量异动
    def is_outburst(self,cur_date, latest_n = 10, periods=60):
        latest_part = self.previous_series(cur_date,latest_n)

        count = 0
        for index,row in latest_part.iterrows():
            if(self.is_vol_outburst_point(row['date'])):
                count += 1

        if(count>5):
            return True
        else:
            return False

    def is_up_cross_outburst(self,cur_date, latest_n=10):
        latest_part = self.previous_series(cur_date, latest_n)

        for index,row in latest_part.iterrows():
            date = row['date']

            if(self.is_outburst(date)):
                return True

        next_part = self.next_periods_series(cur_date, latest_n)

        for index,row in next_part.iterrows():
            date = row['date']

            if(self.is_outburst(date)):
                return True

        return False
    
    
#     最早的那根破ma的K线
    def next_down_ma(self, periods, next_sections):
        the_first=True
        for index,row in next_sections.iterrows():
            if(the_first==True):
                the_first = False
                continue
                
            date = row['date']
            if(self.latest_n_k_break_ma(date)):
                return date
            

        return 'None'
    
    def next_down_boll(self, next_sections):
        for index,row in next_sections.iterrows():
            date = row['date']
            if(self.is_down_boll(date)):
                return date

        return 'None'
        

    def next_down_cross(self, next_sections):
        for index,row in next_sections.iterrows():
            date = row['date']
            if(self.is_down_cross(date)):
                return date

        return 'None'

    def previous_down_cross(self,next_sections):
        reverse_section = next_sections.iloc[::-1]

        for index,row in reverse_section.iterrows():
            date = row['date']
            if(self.is_down_cross(date)):
                return date

        return 'None'
    
    
    def is_always_decrease(self, part):
        for i in range(0, len(part)):
            if(i==0):
                continue
            if(part[i]>part[i-1]):
                return False

        return True
        
    
    def is_always_increase(self, part):
        for i in range(0, len(part)):
            if(i==0):
                continue
            if(part[i]<part[i-1]):
                return False

        return True
            
    
    def ma_direction(self, cur_date, periods, latest_n=5):
        pre_dates = self.previous_series(cur_date, latest_n)['date'].values
        
        if(len(pre_dates)<3):
            return ma_unknow_direction
        
        previous_ma=[]
        for date in pre_dates:
            previous_ma.append(self.ma(date, periods))
            
        if(self.is_always_increase(previous_ma)):
            return ma_upturn_direction
        
        if(self.is_always_decrease(previous_ma)):
            return ma_downturn_direction
        
        return ma_unknow_direction
            
        
        