In [12]:
from datetime import datetime, timedelta
import sqlite3


def connect_db():
    try:
        conn = sqlite3.connect('habit.db')

    except ConnectionError as ex:
        conn = None
        print(ex)

    return conn


class Habit:
    periods = {1: 'daily', 2: 'weekly', 3: 'monthly', 4: 'yearly'}

    def __init__(self, habit, description, period, duration=60, is_template=0):

        if habit is None or len(str(habit).strip()) == 0:
            raise ValueError('Habit can not be empty')
        self.habit = habit

        if description is None or len(str(description).strip()) == 0:
            raise ValueError('You must describe the habit')
        self.description = description

        if period not in Habit.periods:
            raise ValueError('Period must be daily, weekly, monthly or yearly')
        self.period = period

        self.duration = duration
        self.created = datetime.now()
        self.completed = False
        self.closed = False
        self.last_completion_date = None
        self.is_template = is_template
        self.habits_id = None
        self.habits_id = self.get_habits_id()

        # check if the Habit is still open
        # status = self.check_duration()
        # print(status)

    @staticmethod
    def get_instance(hid):
        """ get the habit instance from the database """
        period = None
        try:
            conn = connect_db()
            cur = conn.cursor()
            rs = cur.execute(f"SELECT * FROM habits WHERE habits_id = {hid} ORDER BY last_completion_date DESC")
            rows = list(rs.fetchone())
        except sqlite3.Error as ex:
            print(ex)

        # create a new habit instance
        try:
            habit = Habit(rows[1], rows[2], rows[3], rows[4], rows[8])
            habit.habits_id = rows[0]
            habit.created = rows[5]
            habit.closed = rows[6]
            habit.last_completion_date = rows[7]
            return habit
        except ValueError as ex:
            print(ex)

    def get_habits_id(self):
        """ get the habits_id from the database """

        if self.habits_id is None:
            # connect to DB and get current record
            try:
                conn = connect_db()
                cur = conn.cursor()
            except ConnectionError as ex:
                conn.close()
                print(ex)
            rs = cur.execute("""SELECT MAX(habits_id) FROM habits """)
            nxt = rs.fetchone()[0]
            nxt += 1
            conn.close()
            return nxt
        return self.habits_id

    def delete(self):
        """ delete the habit from the database """
        try:
            conn = connect_db()
            cur = conn.cursor()
            cur.execute(f"DELETE FROM habits WHERE habits_id = {self.habits_id}")
            conf = input(f'Are you sure you want to delete the habit {self.habit}? (y/n)\n')
            if conf == 'y':
                conn.commit()
                print(f'Habit {self.habit} has been deleted.')
            else:
                conn.rollback()
                print('Action cancelled.')
        except ConnectionError as ex:
            print(ex)
            conn.close()

    def check_duration(self):
        """ check if the habit is still open """
        date_format = "%Y-%m-%d %H:%M:%S.%f"
        a = datetime.today()
        b = datetime.strptime(self.created, date_format)
        diff = a - b

        # diff_days = int(diff / timedelta(days=1))
        if self.duration <= diff.days and self.closed == False:
            self.closed = True
            conn = connect_db()
            cur = conn.cursor()
            try:
                cur.execute(""" UPDATE habits SET closed = 1 WHERE habits_id = ?""", str(self.habits_id))
                conn.commit()
                # close connection
                conn.close()
            except ConnectionError as ex:
                conn.close()
                print(ex)
        days_left = int(self.duration) - int(diff.days)
        print(f'You have {days_left} days left to complete the task')
        return diff.days  # return the number of days left

    def complete_today(self):
        """ mark the habit as completed """
        if self.last_completion_date is not None:
            if self.last_completion_date[0: 10] == datetime.now().strftime("%Y-%m-%d"):  # check if habit is already completed today
                raise ValueError('The habit is already completed today')

        self.check_duration()
        self.last_completion_date = datetime.strptime(datetime.now().strftime("%Y-%m-%d %H:%M"), "%Y-%m-%d %H:%M")
        self.save_to_db()

    def save_to_db(self):
        """ save changes to database """
        try:
            conn = connect_db()
            cur = conn.cursor()
            habit_data = [
                (self.habits_id, self.habit, self.description, self.period, self.duration, self.created, self.closed,
                 self.last_completion_date, self.is_template)
            ]
            cur.executemany("""INSERT INTO habits  VALUES (?,?,?,?,?,?,?,?,?)""", habit_data)
        except ConnectionError as ex:
            print(ex)

        try:
            conn.commit()
            # close connection
            conn.close()
        except ConnectionError as ex:
            print(ex)
        print(f'Habit {self.habit} has been saved to the database.')


In [13]:
h = Habit.get_instance(3)

In [14]:
h.check_duration()

You have 44 days left to complete the task


16

In [15]:
h.duration

60

In [16]:
h.complete_today()

ValueError: The habit is already completed today

In [19]:
h2 = Habit.get_instance(2)

In [20]:
h2.check_duration()

You have 14 days left to complete the task


16

In [1]:
from datetime import datetime

import click
import sqlite3

from tabulate import tabulate

from src.habit import connect_db


def get_cursor():
    """
    Get a cursor to the database
    :return: cursor
    """
    try:
        conn = connect_db()
        c = conn.cursor()
    except ConnectionError as ex:
        print(ex)
    return c


@click.group(name="analytics")
def analytic_group():
    """ Commands for your analysis"""


@click.command(name='all-habits')
def show_all_habits():
    """list all habits"""

    # define output format
    table_format = 'fancy_outline'
    first_row = ['id', 'name', 'description', 'period', 'duration in days']
    try:
        cur = get_cursor()
        rs = cur.execute(
            """ SELECT DISTINCT h.habits_id,h.name,h.description,p.name,h.duration FROM habits as h
            INNER JOIN periods as p ON h.periods_fk = p.periods_id  WHERE closed == FALSE ORDER BY habits_id ASC """)
    except ConnectionError as ex:
        print(ex)

    rows = list(rs.fetchall())
    print(tabulate(rows, headers=first_row, tablefmt=table_format))


@click.command(name='list-all-activities')
def show_all_activities():
    """list all activities for all habit"""

    # define output format
    table_format = 'fancy_outline'
    first_row = ['id', 'name', 'description', 'period', 'duration in days', 'last completion date']
    try:
        cur = get_cursor()
        rs = cur.execute(
            """ SELECT  h.habits_id,h.name,h.description,p.name,h.duration, h.last_completion_date FROM habits as h
            INNER JOIN periods as p ON h.periods_fk = p.periods_id  WHERE closed == FALSE ORDER BY habits_id ASC, last_completion_date DESC """)

    except ConnectionError as ex:
        print(ex)

    rows = list(rs.fetchall())
    print(tabulate(rows, headers=first_row, tablefmt=table_format))


@click.command(name='streak-all')
def longest_streak_all():
    """show longest streak of all habits"""

    # define output format
    dateformat = "%Y-%m-%d %H:%M:%S"
    streak = 1
    longest = 1
    h_name = None
    max_habit = None
    try:
        cur = get_cursor()
        rs = cur.execute(
            """ SELECT habits_id, name,periods_fk, last_completion_date FROM habits WHERE last_completion_date not null ORDER BY habits_id ASC """)
        rows = rs.fetchall()
    except sqlite3.Error as ex:
        print(ex)

    for row in rows:
        # helping variables
        first_compl = row[3]
        habit_name = row[1]
        period = row[2]
        if habit_name != h_name:  # if new habit
            streak = 0
        if streak > longest:  # if new longest streak
            longest = streak
            max_habit = h_name
        match period:
            case 1:  # daily
                first = datetime.strptime(first_compl, dateformat)
                current = datetime.strptime(row[3], dateformat)
                delta = current - first
                h_name = habit_name
                if delta.days <= 1:
                    streak += 1
                else:
                    streak = 0
                    continue
            case 2:  # weekly
                first = datetime.strptime(first_compl, dateformat)
                current = datetime.strptime(row[3], dateformat)
                delta = current - first
                h_name = habit_name
                if delta.days <= 7:
                    streak += 1
                else:
                    streak = 0
                    continue
            case 3:  # monthly
                first = datetime.strptime(first_compl, dateformat)
                current = datetime.strptime(row[3], dateformat)
                delta = current - first
                h_name = habit_name
                if delta.days <= 30:
                    streak += 1
                else:
                    streak = 0
                    continue
            case 4:  # yearly
                first = datetime.strptime(first_compl, dateformat)
                current = datetime.strptime(row[3], dateformat)
                delta = current - first
                h_name = habit_name
                if delta.days <= 365:
                    streak += 1
                else:
                    streak = 0
                    continue

    # Output
    print(f'The longest streak is for your habit {max_habit} with {longest} times')


@click.command(name='streak')
@click.argument('habit_id', type=click.IntRange(1))
def longest_streak_habit(habit_id):
    """display longest streak of a habit"""

    # define output format
    dateformat = "%Y-%m-%d %H:%M:%S"
    streak = 1
    longest = 1
    try:
        # database connection
        cur = get_cursor()
        rs = cur.execute(
            """ SELECT habits_id, name,periods_fk, last_completion_date FROM habits WHERE habits_id = ? AND last_completion_date not null  """,
            str(habit_id))
        rows = rs.fetchall()
    except sqlite3.Error as ex:
        print(ex)
    data = rows[0]  # get first row
    period = data[2]
    first_compl = data[3]
    habit_name = data[1]

    for row in rows:  # loop through all rows
        if streak > longest:
            longest = streak
        match period:  # check period
            case 1:  # daily
                first = datetime.strptime(first_compl, dateformat)
                current = datetime.strptime(row[3], dateformat)
                delta = current - first
                if delta.days <= 1:
                    streak += 1
                else:
                    streak = 0
                    continue
            case 2:  # weekly
                first = datetime.strptime(first_compl, dateformat)
                current = datetime.strptime(row[3], dateformat)
                delta = current - first
                if delta.days <= 7:
                    streak += 1
                else:
                    streak = 0
                    continue
            case 3:  # monthly
                first = datetime.strptime(first_compl, dateformat)
                current = datetime.strptime(row[3], dateformat)
                delta = current - first
                if delta.days <= 30:
                    streak += 1
                else:
                    streak = 0
                    continue
            case 4:  # yearly
                first = datetime.strptime(first_compl, dateformat)
                current = datetime.strptime(row[3], dateformat)
                delta = current - first
                if delta.days <= 365:
                    streak += 1
                else:
                    streak = 0
                    continue
    # Output
    print(f'The longest streak for your habit {habit_name} is {longest} times')


@click.command(name='periodicy')
@click.argument("period", type=click.IntRange(1, 4))
def same_periodicy(period):
    """list all habits with the same periodicy"""

    # define output format
    table_format = 'fancy_outline'
    first_row = ['id', 'name', 'description', 'period', 'duration']
    try:
        cur = get_cursor()
        rs = cur.execute(""" SELECT DISTINCT habits_id,name,description,periods_fk,duration FROM habits
        WHERE periods_fk = ?""", str(period))
    except ConnectionError as ex:
        print(ex)

    rows = list(rs.fetchall())
    print(tabulate(rows, headers=first_row, tablefmt=table_format))


# define the group for click commands

analytic_group.add_command(show_all_habits)
analytic_group.add_command(show_all_activities)
analytic_group.add_command(longest_streak_all)
analytic_group.add_command(longest_streak_habit)
analytic_group.add_command(same_periodicy)


In [5]:
same_periodicy(1)

TypeError: 'int' object is not iterable

In [7]:
longest_streak_habit('3')

The longest streak for your habit Coffee is 4 times


AssertionError: 

In [9]:
same_periodicy('1')

╒══════╤════════╤═══════════════════╤══════════╤════════════╕
│   id │ name   │ description       │   period │   duration │
╞══════╪════════╪═══════════════════╪══════════╪════════════╡
│    2 │ Tea    │ drink more Tea    │        1 │         30 │
│    3 │ Coffee │ drink less coffee │        1 │         60 │
╘══════╧════════╧═══════════════════╧══════════╧════════════╛


AssertionError: 